This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cf28445c76 [INLONG-8358][Sort] Add kafka connector on flink 1.15 
(#8713)
cf28445c76 is described below

commit cf28445c7627be0ffbe627f10b274bca3476011a
Author: Hao <1780095+hnrai...@users.noreply.github.com>
AuthorDate: Tue Dec 12 11:03:36 2023 +0800

    [INLONG-8358][Sort] Add kafka connector on flink 1.15 (#8713)
---
 .../src/main/assemblies/sort-connectors-v1.15.xml  |   8 +
 inlong-sort/sort-core/pom.xml                      |   6 +
 .../sort-end-to-end-tests-v1.15/pom.xml            |  14 +-
 ...MysqlToRocksITCase.java => KafkaE2EITCase.java} | 122 +++--
 .../inlong/sort/tests/MysqlToRocksITCase.java      |  13 +-
 .../test/resources/env/kafka_test_kafka_init.txt   |   1 +
 .../src/test/resources/flinkSql/kafka_test.sql     |  61 +++
 .../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 179 +++++++
 .../org/apache/inlong/sort/kafka/KafkaOptions.java |  50 ++
 .../kafka/table/KafkaConnectorOptionsUtil.java     | 585 +++++++++++++++++++++
 .../sort/kafka/table/KafkaDynamicTableFactory.java | 442 ++++++++++++++++
 .../table/UpsertKafkaDynamicTableFactory.java      | 416 +++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  17 +
 .../sort-flink-v1.15/sort-connectors/pom.xml       |   1 +
 licenses/inlong-sort-connectors/LICENSE            |   6 +
 pom.xml                                            |  20 +-
 16 files changed, 1900 insertions(+), 41 deletions(-)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index 855b66f858..1df930a929 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -105,6 +105,14 @@
             <includes>
                 
<include>sort-connector-hudi-v1.15-${project.version}.jar</include>
             </includes>
+        <fileMode>0644</fileMode>
+        </fileSet>
+        <fileSet>
+            
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-kafka-v1.15-${project.version}.jar</include>
+            </includes>
             <fileMode>0644</fileMode>
         </fileSet>
     </fileSets>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index a18d177315..e8974e29f0 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -299,6 +299,12 @@
                     <version>${project.version}</version>
                     <scope>test</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>sort-connector-kafka-v1.15</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
             <build>
                 <plugins>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 07d8663682..abdb6e6500 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -46,12 +46,14 @@
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
-            <version>${testcontainers.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
         </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mongodb</artifactId>
-            <version>${testcontainers.version}</version>
         </dependency>
         <dependency>
             <groupId>org.mongodb</groupId>
@@ -223,6 +225,14 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-connector-kafka-v1.15</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-kafka.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
                     </artifactItems>
                 </configuration>
                 <executions>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
similarity index 53%
copy from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
copy to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
index bbeccd04a5..1399fe2f6f 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests;
 import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
 import org.apache.inlong.sort.tests.utils.JdbcProxy;
 import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
 import org.apache.inlong.sort.tests.utils.StarRocksContainer;
 import org.apache.inlong.sort.tests.utils.TestUtils;
 
@@ -29,9 +30,17 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
 
+import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -40,7 +49,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
@@ -49,28 +61,42 @@ import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRock
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
 
 /**
- * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
- * Test flink sql Mysql cdc to StarRocks
+ * End-to-end tests for sort-connector-kafka uber jar.
  */
-public class MysqlToRocksITCase extends FlinkContainerTestEnv {
+public class KafkaE2EITCase extends FlinkContainerTestEnv {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MysqlToRocksITCase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaE2EITCase.class);
 
+    public static final Logger MYSQL_LOG = 
LoggerFactory.getLogger(MySqlContainer.class);
+
+    public static final Logger KAFKA_LOG = 
LoggerFactory.getLogger(KafkaContainer.class);
+
+    private static final Path kafkaJar = 
TestUtils.getResource("sort-connector-kafka.jar");
     private static final Path mysqlJar = 
TestUtils.getResource("sort-connector-mysql-cdc.jar");
-    private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
+    private static final Path starrocksJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
     private static final Path mysqlJdbcJar = 
TestUtils.getResource("mysql-driver.jar");
+
     private static final String sqlFile;
 
     static {
         try {
-            sqlFile =
-                    
Paths.get(MysqlToRocksITCase.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
+            URI kafkaSqlFile =
+                    
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/flinkSql/kafka_test.sql")).toURI();
+            sqlFile = Paths.get(kafkaSqlFile).toString();
             buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
     }
 
+    @ClassRule
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("kafka")
+                    .withEmbeddedZookeeper()
+                    .withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG));
+
     @ClassRule
     public static StarRocksContainer STAR_ROCKS =
             (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
@@ -86,7 +112,7 @@ public class MysqlToRocksITCase extends 
FlinkContainerTestEnv {
                     .withDatabaseName("test")
                     .withNetwork(NETWORK)
                     .withNetworkAliases("mysql")
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+                    .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
 
     @Before
     public void setup() {
@@ -103,7 +129,7 @@ public class MysqlToRocksITCase extends 
FlinkContainerTestEnv {
                             MYSQL_CONTAINER.getPassword());
             Statement stat = conn.createStatement();
             stat.execute(
-                    "CREATE TABLE test_input1 (\n"
+                    "CREATE TABLE test_input (\n"
                             + "  id SERIAL,\n"
                             + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
                             + "  description VARCHAR(512),\n"
@@ -118,49 +144,83 @@ public class MysqlToRocksITCase extends 
FlinkContainerTestEnv {
 
     @AfterClass
     public static void teardown() {
+        if (KAFKA != null) {
+            KAFKA.stop();
+        }
+
         if (MYSQL_CONTAINER != null) {
             MYSQL_CONTAINER.stop();
         }
+
         if (STAR_ROCKS != null) {
             STAR_ROCKS.stop();
         }
     }
 
+    private void initializeKafkaTable(String topic) {
+        String fileName = "kafka_test_kafka_init.txt";
+        int port = KafkaContainer.ZOOKEEPER_PORT;
+
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("TOPIC", topic);
+        properties.put("ZOOKEEPER_PORT", port);
+
+        try {
+            String createKafkaStatement = getCreateStatement(fileName, 
properties);
+            ExecResult result = KAFKA.execInContainer("bash", "-c", 
createKafkaStatement);
+            LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, 
result.getStdout());
+            if (result.getExitCode() != 0) {
+                throw new RuntimeException("Init kafka topic failed. Exit 
code:" + result.getExitCode());
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String getCreateStatement(String fileName, Map<String, Object> 
properties) {
+        URL url = 
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/env/" + fileName));
+
+        try {
+            Path file = Paths.get(url.toURI());
+            return PlaceholderResolver.getDefaultResolver().resolveByMap(
+                    new String(Files.readAllBytes(file), 
StandardCharsets.UTF_8),
+                    properties);
+        } catch (IOException | URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
-     * Test flink sql postgresql cdc to StarRocks
+     * Test flink sql mysql cdc to starrocks.
      *
-     * @throws Exception The exception may throws when execute the case
+     * @throws Exception The exception may throw when execute the case
      */
     @Test
-    public void testMysqlUpdateAndDelete() throws Exception {
-        submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+    public void testKafkaWithSqlFile() throws Exception {
+        final String topic = "test-topic";
+        initializeKafkaTable(topic);
+
+        submitSQLJob(sqlFile, kafkaJar, starrocksJar, mysqlJar, mysqlJdbcJar);
         waitUntilJobRunning(Duration.ofSeconds(10));
 
         // generate input
-        try (Connection conn =
-                DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), 
MYSQL_CONTAINER.getUsername(),
-                        MYSQL_CONTAINER.getPassword());
+        try (Connection conn = 
DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(),
+                MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword());
                 Statement stat = conn.createStatement()) {
-            stat.execute(
-                    "INSERT INTO test_input1 "
-                            + "VALUES (1,'jacket','water resistent white wind 
breaker');");
-            stat.execute(
-                    "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel 
scooter ');");
-            stat.execute(
-                    "update test_input1 set name = 'tom' where id = 2;");
-            stat.execute(
-                    "delete from test_input1 where id = 1;");
+            stat.execute("INSERT INTO test_input VALUES (1,'jacket','water 
resistant white wind breaker');");
+            stat.execute("INSERT INTO test_input VALUES (2,'scooter','Big 
2-wheel scooter ');");
         } catch (SQLException e) {
             LOG.error("Update table for CDC failed.", e);
             throw e;
         }
 
-        JdbcProxy proxy =
-                new JdbcProxy(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
-                        STAR_ROCKS.getPassword(),
-                        STAR_ROCKS.getDriverClassName());
-        List<String> expectResult =
-                Arrays.asList("2,tom,Big 2-wheel scooter ");
+        JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
+                STAR_ROCKS.getPassword(),
+                STAR_ROCKS.getDriverClassName());
+
+        List<String> expectResult = Arrays.asList(
+                "1,jacket,water resistant white wind breaker",
+                "2,scooter,Big 2-wheel scooter ");
         proxy.checkResultWithTimeout(
                 expectResult,
                 "test_output1",
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
index bbeccd04a5..51501772a9 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
@@ -39,7 +39,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
@@ -155,12 +155,11 @@ public class MysqlToRocksITCase extends 
FlinkContainerTestEnv {
             throw e;
         }
 
-        JdbcProxy proxy =
-                new JdbcProxy(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
-                        STAR_ROCKS.getPassword(),
-                        STAR_ROCKS.getDriverClassName());
-        List<String> expectResult =
-                Arrays.asList("2,tom,Big 2-wheel scooter ");
+        JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
+                STAR_ROCKS.getPassword(),
+                STAR_ROCKS.getDriverClassName());
+
+        List<String> expectResult = Collections.singletonList("2,tom,Big 
2-wheel scooter ");
         proxy.checkResultWithTimeout(
                 expectResult,
                 "test_output1",
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt
new file mode 100644
index 0000000000..b2f31d78fa
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt
@@ -0,0 +1 @@
+kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 
--zookeeper localhost:${ZOOKEEPER_PORT}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql
new file mode 100644
index 0000000000..5bda3b9366
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql
@@ -0,0 +1,61 @@
+CREATE TABLE test_input (
+    `id` INT primary key,
+    name STRING,
+    description STRING
+) WITH (
+    'connector' = 'mysql-cdc-inlong',
+    'hostname' = 'mysql',
+    'port' = '3306',
+    'username' = 'root',
+    'password' = 'inlong',
+    'database-name' = 'test',
+    'table-name' = 'test_input',
+    'scan.incremental.snapshot.enabled' = 'false',
+    'jdbc.properties.useSSL' = 'false',
+    'jdbc.properties.allowPublicKeyRetrieval' = 'true'
+);
+
+CREATE TABLE kafka_load (
+    `id` INT NOT NULL primary key,
+    name STRING,
+    description STRING
+) WITH (
+    'connector' = 'upsert-kafka-inlong',
+    'topic' = 'test-topic',
+    'properties.bootstrap.servers' = 'kafka:9092',
+    'key.format' = 'csv',
+    'value.format' = 'csv'
+);
+
+CREATE TABLE kafka_extract (
+    `id` INT NOT NULL,
+    name STRING,
+    description STRING
+) WITH (
+    'connector' = 'kafka-inlong',
+    'topic' = 'test-topic',
+    'properties.bootstrap.servers' = 'kafka:9092',
+    'properties.group.id' = 'testGroup',
+    'scan.startup.mode' = 'earliest-offset',
+    'format' = 'csv'
+);
+
+CREATE TABLE test_output (
+    `id` INT primary key,
+    name STRING,
+    description STRING
+) WITH (
+    'connector' = 'starrocks-inlong',
+    'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+    'load-url'='starrocks:8030',
+    'database-name'='test',
+    'table-name' = 'test_output1',
+    'username' = 'inlong',
+    'password' = 'inlong',
+    'sink.properties.format' = 'json',
+    'sink.properties.strip_outer_array' = 'true',
+    'sink.buffer-flush.interval-ms' = '1000'
+);
+
+INSERT INTO kafka_load select * from test_input;
+INSERT INTO test_output select * from kafka_extract;
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
new file mode 100644
index 0000000000..61fcc35247
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.15</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-kafka-v1.15</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-kafka</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    <include>org.apache.kafka:*</include>
+                                    
<include>org.apache.flink:flink-connector-kafka</include>
+                                    
<include>org.apache.flink:flink-connector-base</include>
+                                    <!--  Include fixed version 18.0-13.0 of 
flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                    
<include>org.apache.httpcomponents:*</include>
+                                    
<include>org.apache.commons:commons-lang3</include>
+                                    <include>com.google.protobuf:*</include>
+                                    <include>joda-time:*</include>
+                                    
<include>com.fasterxml.jackson.core:*</include>
+                                    <include>com.amazonaws:*</include>
+                                    <include>software.amazon.ion:*</include>
+                                    
<include>commons-logging:commons-logging</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>org.apache.kafka:*</artifact>
+                                    <excludes>
+                                        
<exclude>kafka/kafka-version.properties</exclude>
+                                        <exclude>LICENSE</exclude>
+                                        <!-- Does not contain anything 
relevant.
+                                            Cites a binary dependency on 
jersey, but this is neither reflected in the
+                                            dependency graph, nor are any 
jersey files bundled. -->
+                                        <exclude>NOTICE</exclude>
+                                        <exclude>common/**</exclude>
+                                    </excludes>
+                                </filter>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.kafka</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.commons.logging</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.commons.logging</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons.lang3</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.commons.lang3</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.http</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.com.google</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.amazonaws</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.com.amazonaws</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>software.amazon.ion</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.software.amazon.ion</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.joda.time</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.joda.time</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.configuration</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.configuration</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.protocol</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.protocol</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.schema</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.schema</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.util</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.util</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
new file mode 100644
index 0000000000..6962eb6948
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+
+    private KafkaOptions() {
+    }
+
+    public static final ConfigOption<String> SINK_MULTIPLE_PARTITION_PATTERN =
+            ConfigOptions.key("sink.multiple.partition-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "option 'sink.multiple.partition-pattern' used 
either when the partitioner is raw-hash, or when passing in designated 
partition field names for custom field partitions");
+
+    public static final ConfigOption<String> SINK_FIXED_IDENTIFIER =
+            ConfigOptions.key("sink.fixed.identifier")
+                    .stringType()
+                    .defaultValue("-1");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sink specific options
+    // 
--------------------------------------------------------------------------------------------
+    public static final ConfigOption<Boolean> KAFKA_IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Regard upsert delete as insert kind.");
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
new file mode 100644
index 0000000000..06cf40ea49
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/** Utilities for {@link KafkaConnectorOptions}. */
+@Internal
+class KafkaConnectorOptionsUtil {
+
+    private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
+            
ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Option enumerations
+    // 
--------------------------------------------------------------------------------------------
+
+    // Prefix for Kafka specific properties.
+    public static final String PROPERTIES_PREFIX = "properties.";
+
+    // Sink partitioner.
+    public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
+    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = 
"round-robin";
+
+    // Other keywords.
+    private static final String PARTITION = "partition";
+    private static final String OFFSET = "offset";
+    protected static final String AVRO_CONFLUENT = "avro-confluent";
+    protected static final String DEBEZIUM_AVRO_CONFLUENT = 
"debezium-avro-confluent";
+    private static final List<String> SCHEMA_REGISTRY_FORMATS =
+            Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Validation
+    // 
--------------------------------------------------------------------------------------------
+
+    public static void validateTableSourceOptions(ReadableConfig tableOptions) 
{
+        validateSourceTopic(tableOptions);
+        validateScanStartupMode(tableOptions);
+    }
+
+    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+        validateSinkTopic(tableOptions);
+        validateSinkPartitioner(tableOptions);
+    }
+
+    public static void validateSourceTopic(ReadableConfig tableOptions) {
+        Optional<List<String>> topic = tableOptions.getOptional(TOPIC);
+        Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
+
+        if (topic.isPresent() && pattern.isPresent()) {
+            throw new ValidationException(
+                    "Option 'topic' and 'topic-pattern' shouldn't be set 
together.");
+        }
+
+        if (!topic.isPresent() && !pattern.isPresent()) {
+            throw new ValidationException("Either 'topic' or 'topic-pattern' 
must be set.");
+        }
+    }
+
+    public static void validateSinkTopic(ReadableConfig tableOptions) {
+        String errorMessageTemp =
+                "Flink Kafka sink currently only supports single topic, but 
got %s: %s.";
+        if (!isSingleTopic(tableOptions)) {
+            if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+                throw new ValidationException(
+                        String.format(
+                                errorMessageTemp,
+                                "'topic-pattern'",
+                                tableOptions.get(TOPIC_PATTERN)));
+            } else {
+                throw new ValidationException(
+                        String.format(errorMessageTemp, "'topic'", 
tableOptions.get(TOPIC)));
+            }
+        }
+    }
+
+    private static void validateScanStartupMode(ReadableConfig tableOptions) {
+        tableOptions
+                .getOptional(SCAN_STARTUP_MODE)
+                .ifPresent(
+                        mode -> {
+                            switch (mode) {
+                                case TIMESTAMP:
+                                    if (!tableOptions
+                                            
.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS)
+                                            .isPresent()) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "'%s' is required in 
'%s' startup mode"
+                                                                + " but 
missing.",
+                                                        
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+                                                        
KafkaConnectorOptions.ScanStartupMode.TIMESTAMP));
+                                    }
+
+                                    break;
+                                case SPECIFIC_OFFSETS:
+                                    if (!tableOptions
+                                            
.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS)
+                                            .isPresent()) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "'%s' is required in 
'%s' startup mode"
+                                                                + " but 
missing.",
+                                                        
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+                                                        
KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS));
+                                    }
+                                    if (!isSingleTopic(tableOptions)) {
+                                        throw new ValidationException(
+                                                "Currently Kafka source only 
supports specific offset for single topic.");
+                                    }
+                                    String specificOffsets =
+                                            
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+                                    parseSpecificOffsets(
+                                            specificOffsets, 
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+
+                                    break;
+                            }
+                        });
+    }
+
+    private static void validateSinkPartitioner(ReadableConfig tableOptions) {
+        tableOptions
+                .getOptional(SINK_PARTITIONER)
+                .ifPresent(
+                        partitioner -> {
+                            if 
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN)
+                                    && 
tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+                                throw new ValidationException(
+                                        "Currently 'round-robin' partitioner 
only works when option 'key.fields' is not specified.");
+                            } else if (partitioner.isEmpty()) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Option '%s' should be a 
non-empty string.",
+                                                SINK_PARTITIONER.key()));
+                            }
+                        });
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Utilities
+    // 
--------------------------------------------------------------------------------------------
+
+    public static List<String> getSourceTopics(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(TOPIC).orElse(null);
+    }
+
+    public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
+        return 
tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
+    }
+
+    private static boolean isSingleTopic(ReadableConfig tableOptions) {
+        // Option 'topic-pattern' is regarded as multi-topics.
+        return tableOptions.getOptional(TOPIC).map(t -> t.size() == 
1).orElse(false);
+    }
+
+    /**
+     * Parses SpecificOffsets String to Map.
+     *
+     * <p>SpecificOffsets String format was given as following:
+     *
+     * <pre>
+     *     scan.startup.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+     * </pre>
+     *
+     * @return SpecificOffsets with Map format, key is partition, and value is 
offset
+     */
+    public static Map<Integer, Long> parseSpecificOffsets(
+            String specificOffsetsStr, String optionKey) {
+        final Map<Integer, Long> offsetMap = new HashMap<>();
+        final String[] pairs = specificOffsetsStr.split(";");
+        final String validationExceptionMessage =
+                String.format(
+                        "Invalid properties '%s' should follow the format "
+                                + 
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+                        optionKey, specificOffsetsStr);
+
+        if (pairs.length == 0) {
+            throw new ValidationException(validationExceptionMessage);
+        }
+
+        for (String pair : pairs) {
+            if (null == pair || pair.length() == 0 || !pair.contains(",")) {
+                throw new ValidationException(validationExceptionMessage);
+            }
+
+            final String[] kv = pair.split(",");
+            if (kv.length != 2
+                    || !kv[0].startsWith(PARTITION + ':')
+                    || !kv[1].startsWith(OFFSET + ':')) {
+                throw new ValidationException(validationExceptionMessage);
+            }
+
+            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+            try {
+                final Integer partition = Integer.valueOf(partitionValue);
+                final Long offset = Long.valueOf(offsetValue);
+                offsetMap.put(partition, offset);
+            } catch (NumberFormatException e) {
+                throw new ValidationException(validationExceptionMessage, e);
+            }
+        }
+        return offsetMap;
+    }
+
+    public static StartupOptions getStartupOptions(ReadableConfig 
tableOptions) {
+        final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+        final StartupMode startupMode =
+                tableOptions
+                        .getOptional(SCAN_STARTUP_MODE)
+                        .map(KafkaConnectorOptionsUtil::fromOption)
+                        .orElse(StartupMode.GROUP_OFFSETS);
+        if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
+            // It will be refactored after support specific offset for 
multiple topics in
+            // FLINK-18602. We have already checked tableOptions.get(TOPIC) 
contains one topic in
+            // validateScanStartupMode().
+            buildSpecificOffsets(tableOptions, tableOptions.get(TOPIC).get(0), 
specificOffsets);
+        }
+
+        final StartupOptions options = new StartupOptions();
+        options.startupMode = startupMode;
+        options.specificOffsets = specificOffsets;
+        if (startupMode == StartupMode.TIMESTAMP) {
+            options.startupTimestampMillis = 
tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        }
+        return options;
+    }
+
+    private static void buildSpecificOffsets(
+            ReadableConfig tableOptions,
+            String topic,
+            Map<KafkaTopicPartition, Long> specificOffsets) {
+        String specificOffsetsStrOpt = 
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+        final Map<Integer, Long> offsetMap =
+                parseSpecificOffsets(specificOffsetsStrOpt, 
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+        offsetMap.forEach(
+                (partition, offset) -> {
+                    final KafkaTopicPartition topicPartition =
+                            new KafkaTopicPartition(topic, partition);
+                    specificOffsets.put(topicPartition, offset);
+                });
+    }
+
+    /**
+     * Returns the {@link StartupMode} of Kafka Consumer by passed-in 
table-specific {@link
+     * KafkaConnectorOptions.ScanStartupMode}.
+     */
+    private static StartupMode 
fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
+        switch (scanStartupMode) {
+            case EARLIEST_OFFSET:
+                return StartupMode.EARLIEST;
+            case LATEST_OFFSET:
+                return StartupMode.LATEST;
+            case GROUP_OFFSETS:
+                return StartupMode.GROUP_OFFSETS;
+            case SPECIFIC_OFFSETS:
+                return StartupMode.SPECIFIC_OFFSETS;
+            case TIMESTAMP:
+                return StartupMode.TIMESTAMP;
+
+            default:
+                throw new TableException(
+                        "Unsupported startup mode. Validator should have 
checked that.");
+        }
+    }
+
+    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+        if (tableOptions.get(DELIVERY_GUARANTEE) == 
DeliveryGuarantee.EXACTLY_ONCE
+                && 
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
+            throw new ValidationException(
+                    TRANSACTIONAL_ID_PREFIX.key()
+                            + " must be specified when using 
DeliveryGuarantee.EXACTLY_ONCE.");
+        }
+    }
+
+    /**
+     * Creates an array of indices that determine which physical fields of the 
table schema to
+     * include in the key format and the order that those fields have in the 
key format.
+     *
+     * <p>See {@link KafkaConnectorOptions#KEY_FORMAT}, {@link 
KafkaConnectorOptions#KEY_FIELDS},
+     * and {@link KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more 
information.
+     */
+    public static int[] createKeyFormatProjection(
+            ReadableConfig options, DataType physicalDataType) {
+        final LogicalType physicalType = physicalDataType.getLogicalType();
+        Preconditions.checkArgument(
+                physicalType.is(LogicalTypeRoot.ROW), "Row data type 
expected.");
+        final Optional<String> optionalKeyFormat = 
options.getOptional(KEY_FORMAT);
+        final Optional<List<String>> optionalKeyFields = 
options.getOptional(KEY_FIELDS);
+
+        if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "The option '%s' can only be declared if a key 
format is defined using '%s'.",
+                            KEY_FIELDS.key(), KEY_FORMAT.key()));
+        } else if (optionalKeyFormat.isPresent()
+                && (!optionalKeyFields.isPresent() || 
optionalKeyFields.get().size() == 0)) {
+            throw new ValidationException(
+                    String.format(
+                            "A key format '%s' requires the declaration of one 
or more of key fields using '%s'.",
+                            KEY_FORMAT.key(), KEY_FIELDS.key()));
+        }
+
+        if (!optionalKeyFormat.isPresent()) {
+            return new int[0];
+        }
+
+        final String keyPrefix = 
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
+
+        final List<String> keyFields = optionalKeyFields.get();
+        final List<String> physicalFields = 
LogicalTypeChecks.getFieldNames(physicalType);
+        return keyFields.stream()
+                .mapToInt(
+                        keyField -> {
+                            final int pos = physicalFields.indexOf(keyField);
+                            // check that field name exists
+                            if (pos < 0) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Could not find the field '%s' 
in the table schema for usage in the key format. "
+                                                        + "A key field must be 
a regular, physical column. "
+                                                        + "The following 
columns can be selected in the '%s' option:\n"
+                                                        + "%s",
+                                                keyField, KEY_FIELDS.key(), 
physicalFields));
+                            }
+                            // check that field name is prefixed correctly
+                            if (!keyField.startsWith(keyPrefix)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "All fields in '%s' must be 
prefixed with '%s' when option '%s' "
+                                                        + "is set but field 
'%s' is not prefixed.",
+                                                KEY_FIELDS.key(),
+                                                keyPrefix,
+                                                KEY_FIELDS_PREFIX.key(),
+                                                keyField));
+                            }
+                            return pos;
+                        })
+                .toArray();
+    }
+
+    /**
+     * Creates an array of indices that determine which physical fields of the 
table schema to
+     * include in the value format.
+     *
+     * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link
+     * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {@link
+     * KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information.
+     */
+    public static int[] createValueFormatProjection(
+            ReadableConfig options, DataType physicalDataType) {
+        final LogicalType physicalType = physicalDataType.getLogicalType();
+        Preconditions.checkArgument(
+                physicalType.is(LogicalTypeRoot.ROW), "Row data type 
expected.");
+        final int physicalFieldCount = 
LogicalTypeChecks.getFieldCount(physicalType);
+        final IntStream physicalFields = IntStream.range(0, 
physicalFieldCount);
+
+        final String keyPrefix = 
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
+
+        final KafkaConnectorOptions.ValueFieldsStrategy strategy = 
options.get(VALUE_FIELDS_INCLUDE);
+        if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) {
+            if (keyPrefix.length() > 0) {
+                throw new ValidationException(
+                        String.format(
+                                "A key prefix is not allowed when option '%s' 
is set to '%s'. "
+                                        + "Set it to '%s' instead to avoid 
field overlaps.",
+                                VALUE_FIELDS_INCLUDE.key(),
+                                KafkaConnectorOptions.ValueFieldsStrategy.ALL,
+                                
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY));
+            }
+            return physicalFields.toArray();
+        } else if (strategy == 
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) {
+            final int[] keyProjection = createKeyFormatProjection(options, 
physicalDataType);
+            return physicalFields
+                    .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> 
k == pos))
+                    .toArray();
+        }
+        throw new TableException("Unknown value fields strategy:" + strategy);
+    }
+
+    /**
+     * Returns a new table context with a default schema registry subject 
value in the options if
+     * the format is a schema registry format (e.g. 'avro-confluent') and the 
subject is not
+     * defined.
+     */
+    public static DynamicTableFactory.Context 
autoCompleteSchemaRegistrySubject(
+            DynamicTableFactory.Context context) {
+        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
+        Map<String, String> newOptions = 
autoCompleteSchemaRegistrySubject(tableOptions);
+        if (newOptions.size() > tableOptions.size()) {
+            // build a new context
+            return new FactoryUtil.DefaultDynamicTableContext(
+                    context.getObjectIdentifier(),
+                    context.getCatalogTable().copy(newOptions),
+                    context.getEnrichmentOptions(),
+                    context.getConfiguration(),
+                    context.getClassLoader(),
+                    context.isTemporary());
+        } else {
+            return context;
+        }
+    }
+
+    private static Map<String, String> autoCompleteSchemaRegistrySubject(
+            Map<String, String> options) {
+        Configuration configuration = Configuration.fromMap(options);
+        // the subject autoComplete should only be used in sink, check the 
topic first
+        validateSinkTopic(configuration);
+        final Optional<String> valueFormat = 
configuration.getOptional(VALUE_FORMAT);
+        final Optional<String> keyFormat = 
configuration.getOptional(KEY_FORMAT);
+        final Optional<String> format = configuration.getOptional(FORMAT);
+        final String topic = configuration.get(TOPIC).get(0);
+
+        if (format.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
+            autoCompleteSubject(configuration, format.get(), topic + "-value");
+        } else if (valueFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
+            autoCompleteSubject(configuration, "value." + valueFormat.get(), 
topic + "-value");
+        }
+
+        if (keyFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
+            autoCompleteSubject(configuration, "key." + keyFormat.get(), topic 
+ "-key");
+        }
+        return configuration.toMap();
+    }
+
+    private static void autoCompleteSubject(
+            Configuration configuration, String format, String subject) {
+        ConfigOption<String> subjectOption =
+                ConfigOptions.key(format + "." + SCHEMA_REGISTRY_SUBJECT.key())
+                        .stringType()
+                        .noDefaultValue();
+        if (!configuration.getOptional(subjectOption).isPresent()) {
+            configuration.setString(subjectOption, subject);
+        }
+    }
+
+    /**
+     * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class
+     * name.
+     */
+    public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(
+            ReadableConfig tableOptions, ClassLoader classLoader) {
+        return tableOptions
+                .getOptional(SINK_PARTITIONER)
+                .flatMap(
+                        (String partitioner) -> {
+                            switch (partitioner) {
+                                case SINK_PARTITIONER_VALUE_FIXED:
+                                    return Optional.of(new 
FlinkFixedPartitioner<>());
+                                case SINK_PARTITIONER_VALUE_DEFAULT:
+                                case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+                                    return Optional.empty();
+                                // Default fallback to full class name of the 
partitioner.
+                                default:
+                                    return Optional.of(
+                                            initializePartitioner(partitioner, 
classLoader));
+                            }
+                        });
+    }
+
+    /** Returns a class value with the given class name. */
+    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
+            String name, ClassLoader classLoader) {
+        try {
+            Class<?> clazz = Class.forName(name, true, classLoader);
+            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+                throw new ValidationException(
+                        String.format(
+                                "Sink partitioner class '%s' should extend 
from the required class %s",
+                                name, FlinkKafkaPartitioner.class.getName()));
+            }
+            @SuppressWarnings("unchecked")
+            final FlinkKafkaPartitioner<T> kafkaPartitioner =
+                    InstantiationUtil.instantiate(name, 
FlinkKafkaPartitioner.class, classLoader);
+
+            return kafkaPartitioner;
+        } catch (ClassNotFoundException | FlinkException e) {
+            throw new ValidationException(
+                    String.format("Could not find and instantiate partitioner 
class '%s'", name),
+                    e);
+        }
+    }
+
+    public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
+        final Properties kafkaProperties = new Properties();
+
+        if (hasKafkaClientProperties(tableOptions)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+                    .forEach(
+                            key -> {
+                                final String value = tableOptions.get(key);
+                                final String subKey = 
key.substring((PROPERTIES_PREFIX).length());
+                                kafkaProperties.put(subKey, value);
+                            });
+        }
+        return kafkaProperties;
+    }
+
+    /**
+     * Decides if the table options contains Kafka client properties that 
start with prefix
+     * 'properties'.
+     */
+    private static boolean hasKafkaClientProperties(Map<String, String> 
tableOptions) {
+        return tableOptions.keySet().stream().anyMatch(k -> 
k.startsWith(PROPERTIES_PREFIX));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Inner classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Kafka startup options. */
+    public static class StartupOptions {
+
+        public StartupMode startupMode;
+        public Map<KafkaTopicPartition, Long> specificOffsets;
+        public long startupTimestampMillis;
+    }
+
+    private KafkaConnectorOptionsUtil() {
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
new file mode 100644
index 0000000000..c87735ffc8
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.kafka.KafkaOptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.StartupOptions;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions;
+
+/**
+ * Factory for creating configured instances of {@link KafkaDynamicSource} and 
{@link
+ * KafkaDynamicSink}.
+ */
+@Internal
+public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDynamicTableFactory.class);
+
+    private static final ConfigOption<String> SINK_SEMANTIC =
+            ConfigOptions.key("sink.semantic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Optional semantic when committing.");
+
+    public static final String IDENTIFIER = "kafka-inlong";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROPS_BOOTSTRAP_SERVERS);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FORMAT);
+        options.add(KEY_FORMAT);
+        options.add(KEY_FIELDS);
+        options.add(KEY_FIELDS_PREFIX);
+        options.add(VALUE_FORMAT);
+        options.add(VALUE_FIELDS_INCLUDE);
+        options.add(TOPIC);
+        options.add(TOPIC_PATTERN);
+        options.add(PROPS_GROUP_ID);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        options.add(SINK_PARTITIONER);
+        options.add(SINK_PARALLELISM);
+        options.add(DELIVERY_GUARANTEE);
+        options.add(TRANSACTIONAL_ID_PREFIX);
+        options.add(SINK_SEMANTIC);
+        options.add(Constants.INLONG_METRIC);
+        options.add(Constants.INLONG_AUDIT);
+        options.add(Constants.AUDIT_KEYS);
+        options.add(Constants.SINK_MULTIPLE_FORMAT);
+        options.add(Constants.PATTERN_PARTITION_MAP);
+        options.add(Constants.DATASOURCE_PARTITION_MAP);
+        options.add(Constants.SINK_SCHEMA_CHANGE_ENABLE);
+        options.add(Constants.SINK_SCHEMA_CHANGE_POLICIES);
+        options.add(KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG);
+        options.add(KafkaOptions.SINK_MULTIPLE_PARTITION_PATTERN);
+        options.add(KafkaOptions.SINK_FIXED_IDENTIFIER);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                PROPS_BOOTSTRAP_SERVERS,
+                PROPS_GROUP_ID,
+                TOPIC,
+                TOPIC_PATTERN,
+                SCAN_STARTUP_MODE,
+                SCAN_STARTUP_SPECIFIC_OFFSETS,
+                SCAN_TOPIC_PARTITION_DISCOVERY,
+                SCAN_STARTUP_TIMESTAMP_MILLIS,
+                SINK_PARTITIONER,
+                SINK_PARALLELISM,
+                TRANSACTIONAL_ID_PREFIX)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
+                getKeyDecodingFormat(helper);
+
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                getValueDecodingFormat(helper);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        final String valueFormatPrefix = 
tableOptions.getOptional(FORMAT).orElse(tableOptions.get(VALUE_FORMAT));
+        LOG.info("valueFormatPrefix is {}", valueFormatPrefix);
+        helper.validateExcept(PROPERTIES_PREFIX, Constants.DIRTY_PREFIX, 
valueFormatPrefix);
+
+        validateTableSourceOptions(tableOptions);
+
+        validatePKConstraints(
+                context.getObjectIdentifier(),
+                context.getPrimaryKeyIndexes(),
+                context.getCatalogTable().getOptions(),
+                valueDecodingFormat);
+
+        final StartupOptions startupOptions = getStartupOptions(tableOptions);
+
+        final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+
+        // add topic-partition discovery
+        final Optional<Long> partitionDiscoveryInterval =
+                
tableOptions.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis);
+        properties.setProperty(
+                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+                partitionDiscoveryInterval.orElse(-1L).toString());
+
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        return createKafkaTableSource(
+                physicalDataType,
+                keyDecodingFormat.orElse(null),
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                getSourceTopics(tableOptions),
+                getSourceTopicPattern(tableOptions),
+                properties,
+                startupOptions.startupMode,
+                startupOptions.specificOffsets,
+                startupOptions.startupTimestampMillis,
+                context.getObjectIdentifier().asSummaryString());
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(
+                        this, autoCompleteSchemaRegistrySubject(context));
+
+        final Optional<EncodingFormat<SerializationSchema<RowData>>> 
keyEncodingFormat =
+                getKeyEncodingFormat(helper);
+
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
+                getValueEncodingFormat(helper);
+
+        helper.validateExcept(KafkaConnectorOptionsUtil.PROPERTIES_PREFIX);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        final DeliveryGuarantee deliveryGuarantee = 
validateDeprecatedSemantic(tableOptions);
+        validateTableSinkOptions(tableOptions);
+
+        KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
+
+        validatePKConstraints(
+                context.getObjectIdentifier(),
+                context.getPrimaryKeyIndexes(),
+                context.getCatalogTable().getOptions(),
+                valueEncodingFormat);
+
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        final Integer parallelism = 
tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+
+        return createKafkaTableSink(
+                physicalDataType,
+                keyEncodingFormat.orElse(null),
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                tableOptions.get(TOPIC).get(0),
+                getKafkaProperties(context.getCatalogTable().getOptions()),
+                getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()).orElse(null),
+                deliveryGuarantee,
+                parallelism,
+                tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+    }
+
+    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> 
getKeyDecodingFormat(
+            TableFactoryHelper helper) {
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
+                helper.discoverOptionalDecodingFormat(
+                        DeserializationFormatFactory.class, KEY_FORMAT);
+        keyDecodingFormat.ifPresent(
+                format -> {
+                    if 
(!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A key format should only deal with 
INSERT-only records. "
+                                                + "But %s has a changelog mode 
of %s.",
+                                        helper.getOptions().get(KEY_FORMAT),
+                                        format.getChangelogMode()));
+                    }
+                });
+        return keyDecodingFormat;
+    }
+
+    private static Optional<EncodingFormat<SerializationSchema<RowData>>> 
getKeyEncodingFormat(
+            TableFactoryHelper helper) {
+        final Optional<EncodingFormat<SerializationSchema<RowData>>> 
keyEncodingFormat =
+                
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, 
KEY_FORMAT);
+        keyEncodingFormat.ifPresent(
+                format -> {
+                    if 
(!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A key format should only deal with 
INSERT-only records. "
+                                                + "But %s has a changelog mode 
of %s.",
+                                        helper.getOptions().get(KEY_FORMAT),
+                                        format.getChangelogMode()));
+                    }
+                });
+        return keyEncodingFormat;
+    }
+
+    private static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
+            TableFactoryHelper helper) {
+        return helper.discoverOptionalDecodingFormat(
+                DeserializationFormatFactory.class, FORMAT)
+                .orElseGet(
+                        () -> helper.discoverDecodingFormat(
+                                DeserializationFormatFactory.class, 
VALUE_FORMAT));
+    }
+
+    private static EncodingFormat<SerializationSchema<RowData>> 
getValueEncodingFormat(
+            TableFactoryHelper helper) {
+        return helper.discoverOptionalEncodingFormat(
+                SerializationFormatFactory.class, FORMAT)
+                .orElseGet(
+                        () -> helper.discoverEncodingFormat(
+                                SerializationFormatFactory.class, 
VALUE_FORMAT));
+    }
+
+    private static void validatePKConstraints(
+            ObjectIdentifier tableName,
+            int[] primaryKeyIndexes,
+            Map<String, String> options,
+            Format format) {
+        if (primaryKeyIndexes.length > 0
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            Configuration configuration = Configuration.fromMap(options);
+            String formatName =
+                    configuration
+                            .getOptional(FORMAT)
+                            .orElse(configuration.get(VALUE_FORMAT));
+            throw new ValidationException(
+                    String.format(
+                            "The Kafka table '%s' with '%s' format doesn't 
support defining PRIMARY KEY constraint"
+                                    + " on the table, because it can't 
guarantee the semantic of primary key.",
+                            tableName.asSummaryString(), formatName));
+        }
+    }
+
+    private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig 
tableOptions) {
+        if (tableOptions.getOptional(SINK_SEMANTIC).isPresent()) {
+            LOG.warn(
+                    "{} is deprecated and will be removed. Please use {} 
instead.",
+                    SINK_SEMANTIC.key(),
+                    DELIVERY_GUARANTEE.key());
+            return DeliveryGuarantee.valueOf(
+                    tableOptions.get(SINK_SEMANTIC).toUpperCase().replace("-", 
"_"));
+        }
+        return tableOptions.get(DELIVERY_GUARANTEE);
+    }
+
+    protected KafkaDynamicSource createKafkaTableSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
+            Properties properties,
+            StartupMode startupMode,
+            Map<KafkaTopicPartition, Long> specificStartupOffsets,
+            long startupTimestampMillis,
+            String tableIdentifier) {
+        return new KafkaDynamicSource(
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                properties,
+                startupMode,
+                specificStartupOffsets,
+                startupTimestampMillis,
+                false,
+                tableIdentifier);
+    }
+
+    protected KafkaDynamicSink createKafkaTableSink(
+            DataType physicalDataType,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> 
keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            String topic,
+            Properties properties,
+            FlinkKafkaPartitioner<RowData> partitioner,
+            DeliveryGuarantee deliveryGuarantee,
+            Integer parallelism,
+            @Nullable String transactionalIdPrefix) {
+        return new KafkaDynamicSink(
+                physicalDataType,
+                physicalDataType,
+                keyEncodingFormat,
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topic,
+                properties,
+                partitioner,
+                deliveryGuarantee,
+                false,
+                SinkBufferFlushMode.DISABLED,
+                parallelism,
+                transactionalIdPrefix);
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
new file mode 100644
index 0000000000..2f61265c32
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.kafka.KafkaOptions;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
+import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+
+/** Upsert-Kafka factory. */
+public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "upsert-kafka-inlong";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROPS_BOOTSTRAP_SERVERS);
+        options.add(TOPIC);
+        options.add(KEY_FORMAT);
+        options.add(VALUE_FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(KEY_FIELDS_PREFIX);
+        options.add(VALUE_FIELDS_INCLUDE);
+        options.add(SINK_PARALLELISM);
+        options.add(SINK_BUFFER_FLUSH_INTERVAL);
+        options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        options.add(Constants.INLONG_METRIC);
+        options.add(KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG);
+        options.add(KafkaOptions.SINK_MULTIPLE_PARTITION_PATTERN);
+        options.add(KafkaOptions.SINK_FIXED_IDENTIFIER);
+        options.add(KafkaConnectorOptions.SINK_PARTITIONER);
+        return options;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        ReadableConfig tableOptions = helper.getOptions();
+        DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat =
+                
helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+        DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+                
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+
+        // Validate the option data type.
+        helper.validateExcept(PROPERTIES_PREFIX, Constants.DIRTY_PREFIX);
+        validateSource(
+                tableOptions,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                context.getPrimaryKeyIndexes());
+
+        Tuple2<int[], int[]> keyValueProjections =
+                createKeyValueProjections(context.getCatalogTable());
+        String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+        Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+        // always use earliest to keep data integrity
+        StartupMode earliest = StartupMode.EARLIEST;
+
+        return new KafkaDynamicSource(
+                context.getPhysicalRowDataType(),
+                keyDecodingFormat,
+                new DecodingFormatWrapper(valueDecodingFormat),
+                keyValueProjections.f0,
+                keyValueProjections.f1,
+                keyPrefix,
+                getSourceTopics(tableOptions),
+                getSourceTopicPattern(tableOptions),
+                properties,
+                earliest,
+                Collections.emptyMap(),
+                0,
+                true,
+                context.getObjectIdentifier().asSummaryString());
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(
+                        this, autoCompleteSchemaRegistrySubject(context));
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+                
helper.discoverEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
+        EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+                
helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT);
+
+        // Validate the option data type.
+        helper.validateExcept(PROPERTIES_PREFIX);
+        validateSink(
+                tableOptions,
+                keyEncodingFormat,
+                valueEncodingFormat,
+                context.getPrimaryKeyIndexes());
+
+        Tuple2<int[], int[]> keyValueProjections =
+                createKeyValueProjections(context.getCatalogTable());
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+        final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+
+        Integer parallelism = tableOptions.get(SINK_PARALLELISM);
+
+        int batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+        Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
+        SinkBufferFlushMode flushMode =
+                new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
+
+        // use {@link 
org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+        // it will use hash partition if key is set else in round-robin 
behaviour.
+        return new KafkaDynamicSink(
+                context.getPhysicalRowDataType(),
+                context.getPhysicalRowDataType(),
+                keyEncodingFormat,
+                new EncodingFormatWrapper(valueEncodingFormat),
+                keyValueProjections.f0,
+                keyValueProjections.f1,
+                keyPrefix,
+                tableOptions.get(TOPIC).get(0),
+                properties,
+                null,
+                DeliveryGuarantee.AT_LEAST_ONCE,
+                true,
+                flushMode,
+                parallelism,
+                tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+    }
+
+    private Tuple2<int[], int[]> 
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
+        ResolvedSchema schema = catalogTable.getResolvedSchema();
+        // primary key should validated earlier
+        List<String> keyFields = schema.getPrimaryKey().get().getColumns();
+        DataType physicalDataType = schema.toPhysicalRowDataType();
+
+        Configuration tableOptions = 
Configuration.fromMap(catalogTable.getOptions());
+        // upsert-kafka will set key.fields to primary key fields by default
+        tableOptions.set(KEY_FIELDS, keyFields);
+
+        int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+        int[] valueProjection = createValueFormatProjection(tableOptions, 
physicalDataType);
+
+        return Tuple2.of(keyProjection, valueProjection);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Validation
+    // 
--------------------------------------------------------------------------------------------
+
+    private static void validateSource(
+            ReadableConfig tableOptions,
+            Format keyFormat,
+            Format valueFormat,
+            int[] primaryKeyIndexes) {
+        validateTopic(tableOptions);
+        validateFormat(keyFormat, valueFormat, tableOptions);
+        validatePKConstraints(primaryKeyIndexes);
+    }
+
+    private static void validateSink(
+            ReadableConfig tableOptions,
+            Format keyFormat,
+            Format valueFormat,
+            int[] primaryKeyIndexes) {
+        validateTopic(tableOptions);
+        validateFormat(keyFormat, valueFormat, tableOptions);
+        validatePKConstraints(primaryKeyIndexes);
+        validateSinkBufferFlush(tableOptions);
+    }
+
+    private static void validateTopic(ReadableConfig tableOptions) {
+        List<String> topic = tableOptions.get(TOPIC);
+        if (topic.size() > 1) {
+            throw new ValidationException(
+                    "The 'upsert-kafka' connector doesn't support topic list 
now. "
+                            + "Please use single topic as the value of the 
parameter 'topic'.");
+        }
+    }
+
+    private static void validateFormat(
+            Format keyFormat, Format valueFormat, ReadableConfig tableOptions) 
{
+        if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            String identifier = tableOptions.get(KEY_FORMAT);
+            throw new ValidationException(
+                    String.format(
+                            "'upsert-kafka' connector doesn't support '%s' as 
key format, "
+                                    + "because '%s' is not in insert-only 
mode.",
+                            identifier, identifier));
+        }
+        if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            String identifier = tableOptions.get(VALUE_FORMAT);
+            throw new ValidationException(
+                    String.format(
+                            "'upsert-kafka' connector doesn't support '%s' as 
value format, "
+                                    + "because '%s' is not in insert-only 
mode.",
+                            identifier, identifier));
+        }
+    }
+
+    private static void validatePKConstraints(int[] schema) {
+        if (schema.length == 0) {
+            throw new ValidationException(
+                    "'upsert-kafka' tables require to define a PRIMARY KEY 
constraint. "
+                            + "The PRIMARY KEY specifies which columns should 
be read from or write to the Kafka message key. "
+                            + "The PRIMARY KEY also defines records in the 
'upsert-kafka' table should update or delete on which keys.");
+        }
+    }
+
+    private static void validateSinkBufferFlush(ReadableConfig tableOptions) {
+        int flushMaxRows = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+        long flushIntervalMs = 
tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis();
+        if (flushMaxRows > 0 && flushIntervalMs > 0) {
+            // flush is enabled
+            return;
+        }
+        if (flushMaxRows <= 0 && flushIntervalMs <= 0) {
+            // flush is disabled
+            return;
+        }
+        // one of them is set which is not allowed
+        throw new ValidationException(
+                String.format(
+                        "'%s' and '%s' must be set to be greater than zero 
together to enable sink buffer flushing.",
+                        SINK_BUFFER_FLUSH_MAX_ROWS.key(), 
SINK_BUFFER_FLUSH_INTERVAL.key()));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format wrapper
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * It is used to wrap the decoding format and expose the desired changelog 
mode. It's only works
+     * for insert-only format.
+     */
+    protected static class DecodingFormatWrapper
+            implements
+                DecodingFormat<DeserializationSchema<RowData>> {
+
+        private final DecodingFormat<DeserializationSchema<RowData>> 
innerDecodingFormat;
+
+        private static final ChangelogMode SOURCE_CHANGELOG_MODE =
+                ChangelogMode.newBuilder()
+                        .addContainedKind(RowKind.UPDATE_AFTER)
+                        .addContainedKind(RowKind.DELETE)
+                        .build();
+
+        public DecodingFormatWrapper(
+                DecodingFormat<DeserializationSchema<RowData>> 
innerDecodingFormat) {
+            this.innerDecodingFormat = innerDecodingFormat;
+        }
+
+        @Override
+        public DeserializationSchema<RowData> createRuntimeDecoder(
+                DynamicTableSource.Context context, DataType producedDataType) 
{
+            return innerDecodingFormat.createRuntimeDecoder(context, 
producedDataType);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return SOURCE_CHANGELOG_MODE;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            DecodingFormatWrapper that = (DecodingFormatWrapper) obj;
+            return Objects.equals(innerDecodingFormat, 
that.innerDecodingFormat);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(innerDecodingFormat);
+        }
+    }
+
+    /**
+     * It is used to wrap the encoding format and expose the desired changelog 
mode. It's only works
+     * for insert-only format.
+     */
+    protected static class EncodingFormatWrapper
+            implements
+                EncodingFormat<SerializationSchema<RowData>> {
+
+        private final EncodingFormat<SerializationSchema<RowData>> 
innerEncodingFormat;
+
+        public static final ChangelogMode SINK_CHANGELOG_MODE =
+                ChangelogMode.newBuilder()
+                        .addContainedKind(RowKind.INSERT)
+                        .addContainedKind(RowKind.UPDATE_AFTER)
+                        .addContainedKind(RowKind.DELETE)
+                        .build();
+
+        public EncodingFormatWrapper(
+                EncodingFormat<SerializationSchema<RowData>> 
innerEncodingFormat) {
+            this.innerEncodingFormat = innerEncodingFormat;
+        }
+
+        @Override
+        public SerializationSchema<RowData> createRuntimeEncoder(
+                DynamicTableSink.Context context, DataType consumedDataType) {
+            return innerEncodingFormat.createRuntimeEncoder(context, 
consumedDataType);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return SINK_CHANGELOG_MODE;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            EncodingFormatWrapper that = (EncodingFormatWrapper) obj;
+            return Objects.equals(innerEncodingFormat, 
that.innerEncodingFormat);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(innerEncodingFormat);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..7eeb7cd7a8
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 38c475a573..dd789c6bd5 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -42,6 +42,7 @@
         <module>tubemq</module>
         <module>hbase</module>
         <module>hudi</module>
+        <module>kafka</module>
     </modules>
 
     <properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 3ac170a2d3..ec46cd86db 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -815,6 +815,12 @@
  Source  : flink-connector-hbase-2.2 1.15.4 (Please note that the software 
have been modified.)
  License : https://github.com/apache/flink/blob/master/LICENSE
 
+1.3.20 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+  Source  : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that 
the software have been modified.)
+  License : https://github.com/apache/flink/blob/master/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 
diff --git a/pom.xml b/pom.xml
index 65086684c4..3d03a436ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1192,6 +1192,12 @@
                 <version>${testcontainers.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>jdbc</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
             <dependency>
                 <groupId>org.testcontainers</groupId>
                 <artifactId>mysql</artifactId>
@@ -1200,7 +1206,19 @@
             </dependency>
             <dependency>
                 <groupId>org.testcontainers</groupId>
-                <artifactId>jdbc</artifactId>
+                <artifactId>postgresql</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>mongodb</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>clickhouse</artifactId>
                 <version>${testcontainers.version}</version>
                 <scope>test</scope>
             </dependency>

Reply via email to