This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f2110a4316 [INLONG-10055][Sort] Support flink-connector-jdbc based on 
flink 1.18 (#10073)
f2110a4316 is described below

commit f2110a43167cf12646956730a7997b8099b88d9d
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Fri Apr 26 18:52:12 2024 +0800

    [INLONG-10055][Sort] Support flink-connector-jdbc based on flink 1.18 
(#10073)
---
 .../src/main/assemblies/sort-connectors-v1.18.xml  |   9 +
 .../sort-flink-v1.18/sort-connectors/jdbc/pom.xml  | 117 +++++
 .../jdbc/internal/GenericJdbcSinkFunction.java     |  82 ++++
 .../sort/jdbc/internal/JdbcOutputFormat.java       | 499 +++++++++++++++++++++
 .../jdbc/internal/TableJdbcUpsertOutputFormat.java | 225 ++++++++++
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   | 357 +++++++++++++++
 .../sort/jdbc/table/JdbcDynamicTableSink.java      | 148 ++++++
 .../sort/jdbc/table/JdbcOutputFormatBuilder.java   | 283 ++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../sort-flink-v1.18/sort-connectors/pom.xml       |   1 +
 licenses/inlong-sort-connectors/LICENSE            |   9 +
 11 files changed, 1746 insertions(+)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
index 61465915fa..04e1fce16b 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
@@ -36,5 +36,14 @@
             <fileMode>0644</fileMode>
         </fileSet>
 
+        <fileSet>
+            
<directory>../inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-jdbc-v1.18-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
+
     </fileSets>
 </assembly>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml
new file mode 100644
index 0000000000..dcd7e24afc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.18</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-jdbc-v1.18</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-jdbc</name>
+
+    <properties>
+        <flink.connector.jdbc.version>3.1.2-1.18</flink.connector.jdbc.version>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-jdbc</artifactId>
+            <version>${flink.connector.jdbc.version}</version>
+        </dependency>
+
+        <!--for mysql-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <!-- Postgres -->
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
+
+        <!-- Oracle -->
+        <dependency>
+            <groupId>com.oracle.database.jdbc</groupId>
+            <artifactId>ojdbc8</artifactId>
+        </dependency>
+
+        <!-- SQL Server -->
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
new file mode 100644
index 0000000000..39c6e45ec0
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/** A generic SinkFunction for JDBC.
+ * Modify from {@link 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction}
+ * */
+@Internal
+public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
+        implements
+            CheckpointedFunction,
+            InputTypeConfigurable {
+
+    private final JdbcOutputFormat<T, ?, ?> outputFormat;
+
+    public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat<T, ?, ?> 
outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);
+        outputFormat.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws IOException {
+        outputFormat.writeRecord(value);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) {
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        outputFormat.flush();
+    }
+
+    @Override
+    public void close() {
+        outputFormat.close();
+    }
+
+    @Override
+    public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+        outputFormat.setInputType(type, executionConfig);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
new file mode 100644
index 0000000000..835ed77e00
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.function.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A JDBC outputFormat that supports batching records before writing records 
to database.
+ *  Modify from {@link 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * */
+@Internal
+public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExecutor<JdbcIn>>
+        extends
+            RichOutputFormat<In>
+        implements
+            Flushable,
+            InputTypeConfigurable {
+
+    protected final JdbcConnectionProvider connectionProvider;
+    @Nullable
+    private TypeSerializer<In> serializer;
+
+    // audit
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private final String auditKeys;
+    private SinkMetricData sinkMetricData;
+    private Long rowCount = 0L;
+    private Long dataSize = 0L;
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+        if (executionConfig.isObjectReuseEnabled()) {
+            this.serializer = (TypeSerializer<In>) 
type.createSerializer(executionConfig);
+        }
+    }
+
+    /**
+     * An interface to extract a value from given argument.
+     *
+     * @param <F> The type of given argument
+     * @param <T> The type of the return value
+     */
+    public interface RecordExtractor<F, T> extends Function<F, T>, 
Serializable {
+
+        static <T> JdbcOutputFormat.RecordExtractor<T, T> identity() {
+            return x -> x;
+        }
+    }
+
+    /**
+     * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+     *
+     * @param <T> The type of instance.
+     */
+    public interface StatementExecutorFactory<T extends 
JdbcBatchStatementExecutor<?>>
+            extends
+                SerializableFunction<RuntimeContext, T> {
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcOutputFormat.class);
+
+    private final JdbcExecutionOptions executionOptions;
+    private final JdbcOutputFormat.StatementExecutorFactory<JdbcExec> 
statementExecutorFactory;
+    private final JdbcOutputFormat.RecordExtractor<In, JdbcIn> 
jdbcRecordExtractor;
+
+    private transient JdbcExec jdbcStatementExecutor;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public JdbcOutputFormat(
+            @Nonnull JdbcConnectionProvider connectionProvider,
+            @Nonnull JdbcExecutionOptions executionOptions,
+            @Nonnull JdbcOutputFormat.StatementExecutorFactory<JdbcExec> 
statementExecutorFactory,
+            @Nonnull JdbcOutputFormat.RecordExtractor<In, JdbcIn> 
recordExtractor,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
+        this.connectionProvider = checkNotNull(connectionProvider);
+        this.executionOptions = checkNotNull(executionOptions);
+        this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+        this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+        // audit
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     *
+     * @param taskNumber The number of the parallel instance.
+     */
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        try {
+            connectionProvider.getOrEstablishConnection();
+        } catch (Exception e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+        // audit
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+
+        jdbcStatementExecutor = 
createAndOpenStatementExecutor(statementExecutorFactory);
+        if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(
+                            1, new 
ExecutorThreadFactory("jdbc-upsert-output-format"));
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized (JdbcOutputFormat.this) {
+                                    if (!closed) {
+                                        try {
+                                            flush();
+                                        } catch (Exception e) {
+                                            flushException = e;
+                                        }
+                                    }
+                                }
+                            },
+                            executionOptions.getBatchIntervalMs(),
+                            executionOptions.getBatchIntervalMs(),
+                            TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private JdbcExec createAndOpenStatementExecutor(
+            JdbcOutputFormat.StatementExecutorFactory<JdbcExec> 
statementExecutorFactory) throws IOException {
+        JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+        try {
+            exec.prepareStatements(connectionProvider.getConnection());
+        } catch (SQLException e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+        return exec;
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to JDBC failed.", 
flushException);
+        }
+    }
+
+    @Override
+    public final synchronized void writeRecord(In record) throws IOException {
+        checkFlushException();
+        updateMetric(record);
+        try {
+            In recordCopy = copyIfNecessary(record);
+            addToBatch(record, jdbcRecordExtractor.apply(recordCopy));
+            batchCount++;
+            if (executionOptions.getBatchSize() > 0
+                    && batchCount >= executionOptions.getBatchSize()) {
+                flush();
+                if (sinkMetricData != null) {
+                    sinkMetricData.invoke(rowCount, dataSize);
+                }
+            }
+        } catch (Exception e) {
+            throw new IOException("Writing records to JDBC failed.", e);
+        }
+    }
+
+    private void updateMetric(In record) {
+        rowCount++;
+        dataSize += CalculateObjectSizeUtils.getDataSize(record);
+    }
+
+    private In copyIfNecessary(In record) {
+        return serializer == null ? record : serializer.copy(record);
+    }
+
+    protected void addToBatch(In original, JdbcIn extracted) throws 
SQLException {
+        jdbcStatementExecutor.addToBatch(extracted);
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        checkFlushException();
+
+        for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
+            try {
+                attemptFlush();
+                batchCount = 0;
+                break;
+            } catch (SQLException e) {
+                LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+                if (i >= executionOptions.getMaxRetries()) {
+                    throw new IOException(e);
+                }
+                try {
+                    if (!connectionProvider.isConnectionValid()) {
+                        updateExecutor(true);
+                    }
+                } catch (Exception exception) {
+                    LOG.error(
+                            "JDBC connection is not valid, and reestablish 
connection failed.",
+                            exception);
+                    throw new IOException("Reestablish JDBC connection 
failed", exception);
+                }
+                try {
+                    Thread.sleep(1000 * i);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(
+                            "unable to flush; interrupted while doing another 
attempt", e);
+                }
+            }
+        }
+        // audit, flush metrics data
+        if (sinkMetricData != null) {
+            sinkMetricData.flushAuditData();
+        }
+    }
+
+    protected void attemptFlush() throws SQLException {
+        jdbcStatementExecutor.executeBatch();
+    }
+
+    /** Executes prepared statement and closes all resources of this instance. 
*/
+    @Override
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to JDBC failed.", e);
+                    throw new RuntimeException("Writing records to JDBC 
failed.", e);
+                }
+            }
+
+            // audit, flush metrics data
+            if (sinkMetricData != null) {
+                sinkMetricData.flushAuditData();
+            }
+
+            try {
+                if (jdbcStatementExecutor != null) {
+                    jdbcStatementExecutor.closeStatements();
+                }
+            } catch (SQLException e) {
+                LOG.warn("Close JDBC writer failed.", e);
+            }
+        }
+        connectionProvider.closeConnection();
+        checkFlushException();
+    }
+
+    public static JdbcOutputFormat.Builder builder() {
+        return new JdbcOutputFormat.Builder();
+    }
+
+    /** Builder for a {@link JdbcOutputFormat}. */
+    public static class Builder {
+
+        private InternalJdbcConnectionOptions options;
+        private String[] fieldNames;
+        private String[] keyFields;
+        private int[] fieldTypes;
+
+        // audit
+        private String inlongMetric;
+        private String auditHostAndPorts;
+        private String auditKeys;
+
+        private JdbcExecutionOptions.Builder executionOptionsBuilder =
+                JdbcExecutionOptions.builder();
+
+        /** required, jdbc options. */
+        public JdbcOutputFormat.Builder 
setOptions(InternalJdbcConnectionOptions options) {
+            this.options = options;
+            return this;
+        }
+
+        /** required, field names of this jdbc sink. */
+        public JdbcOutputFormat.Builder setFieldNames(String[] fieldNames) {
+            this.fieldNames = fieldNames;
+            return this;
+        }
+
+        /** required, upsert unique keys. */
+        public JdbcOutputFormat.Builder setKeyFields(String[] keyFields) {
+            this.keyFields = keyFields;
+            return this;
+        }
+
+        /** required, field types of this jdbc sink. */
+        public JdbcOutputFormat.Builder setFieldTypes(int[] fieldTypes) {
+            this.fieldTypes = fieldTypes;
+            return this;
+        }
+
+        /**
+         * optional, flush max size (includes all append, upsert and delete 
records), over this
+         * number of records, will flush data.
+         */
+        public JdbcOutputFormat.Builder setFlushMaxSize(int flushMaxSize) {
+            executionOptionsBuilder.withBatchSize(flushMaxSize);
+            return this;
+        }
+
+        /** optional, flush interval mills, over this time, asynchronous 
threads will flush data. */
+        public JdbcOutputFormat.Builder setFlushIntervalMills(long 
flushIntervalMills) {
+            executionOptionsBuilder.withBatchIntervalMs(flushIntervalMills);
+            return this;
+        }
+
+        /** optional, max retry times for jdbc connector. */
+        public JdbcOutputFormat.Builder setMaxRetryTimes(int maxRetryTimes) {
+            executionOptionsBuilder.withMaxRetries(maxRetryTimes);
+            return this;
+        }
+
+        public JdbcOutputFormat.Builder setInlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
+        public JdbcOutputFormat.Builder setAuditHostAndPorts(String 
auditHostAndPorts) {
+            this.auditHostAndPorts = auditHostAndPorts;
+            return this;
+        }
+
+        public JdbcOutputFormat.Builder setAuditKeys(String auditKeys) {
+            this.auditKeys = auditKeys;
+            return this;
+        }
+
+        /**
+         * Finalizes the configuration and checks validity.
+         *
+         * @return Configured JdbcUpsertOutputFormat
+         */
+        public JdbcOutputFormat<Tuple2<Boolean, Row>, Row, 
JdbcBatchStatementExecutor<Row>> build() {
+            checkNotNull(options, "No options supplied.");
+            checkNotNull(fieldNames, "No fieldNames supplied.");
+            JdbcDmlOptions dml =
+                    JdbcDmlOptions.builder()
+                            .withTableName(options.getTableName())
+                            .withDialect(options.getDialect())
+                            .withFieldNames(fieldNames)
+                            .withKeyFields(keyFields)
+                            .withFieldTypes(fieldTypes)
+                            .build();
+            if (dml.getKeyFields().isPresent() && 
dml.getKeyFields().get().length > 0) {
+                return new TableJdbcUpsertOutputFormat(
+                        new SimpleJdbcConnectionProvider(options),
+                        dml,
+                        executionOptionsBuilder.build(),
+                        inlongMetric,
+                        auditHostAndPorts,
+                        auditKeys);
+            } else {
+                // warn: don't close over builder fields
+                String sql =
+                        FieldNamedPreparedStatementImpl.parseNamedStatement(
+                                options.getDialect()
+                                        .getInsertIntoStatement(
+                                                dml.getTableName(), 
dml.getFieldNames()),
+                                new HashMap<>());
+                return new JdbcOutputFormat<>(
+                        new SimpleJdbcConnectionProvider(options),
+                        executionOptionsBuilder.build(),
+                        ctx -> createSimpleRowExecutor(
+                                sql,
+                                dml.getFieldTypes(),
+                                
ctx.getExecutionConfig().isObjectReuseEnabled()),
+                        tuple2 -> {
+                            Preconditions.checkArgument(tuple2.f0);
+                            return tuple2.f1;
+                        },
+                        inlongMetric,
+                        auditHostAndPorts,
+                        auditKeys);
+            }
+        }
+    }
+
+    static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(
+            String sql, int[] fieldTypes, boolean objectReuse) {
+        return JdbcBatchStatementExecutor.simple(
+                sql,
+                createRowJdbcStatementBuilder(fieldTypes),
+                objectReuse ? Row::copy : Function.identity());
+    }
+
+    /**
+     * Creates a {@link JdbcStatementBuilder} for {@link Row} using the 
provided SQL types array.
+     * Uses {@link JdbcUtils#setRecordToStatement}
+     */
+    static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] 
types) {
+        return (st, record) -> setRecordToStatement(st, types, record);
+    }
+
+    public void updateExecutor(boolean reconnect) throws SQLException, 
ClassNotFoundException {
+        jdbcStatementExecutor.closeStatements();
+        jdbcStatementExecutor.prepareStatements(
+                reconnect
+                        ? connectionProvider.reestablishConnection()
+                        : connectionProvider.getConnection());
+    }
+
+    /** Returns configured {@code JdbcExecutionOptions}. */
+    public JdbcExecutionOptions getExecutionOptions() {
+        return executionOptions;
+    }
+
+    @VisibleForTesting
+    public Connection getConnection() {
+        return connectionProvider.getConnection();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
new file mode 100644
index 0000000000..5c5ff2978c
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getPrimaryKey;
+import static 
org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Upsert jdbc output format.
+ * Modify from {@link 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * */
+
+class TableJdbcUpsertOutputFormat
+        extends
+            JdbcOutputFormat<Tuple2<Boolean, Row>, Row, 
JdbcBatchStatementExecutor<Row>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(
+            TableJdbcUpsertOutputFormat.class);
+
+    private JdbcBatchStatementExecutor<Row> deleteExecutor;
+    private final StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> 
deleteStatementExecutorFactory;
+
+    // audit
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+
+    TableJdbcUpsertOutputFormat(
+            JdbcConnectionProvider connectionProvider,
+            JdbcDmlOptions dmlOptions,
+            JdbcExecutionOptions batchOptions,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
+        this(
+                connectionProvider,
+                batchOptions,
+                ctx -> createUpsertRowExecutor(dmlOptions, ctx),
+                ctx -> createDeleteExecutor(dmlOptions, ctx),
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
+    }
+
+    @VisibleForTesting
+    TableJdbcUpsertOutputFormat(
+            JdbcConnectionProvider connectionProvider,
+            JdbcExecutionOptions batchOptions,
+            StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> 
statementExecutorFactory,
+            StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> 
deleteStatementExecutorFactory,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
+        super(connectionProvider, batchOptions, statementExecutorFactory, 
tuple2 -> tuple2.f1,
+                inlongMetric, auditHostAndPorts, auditKeys);
+        this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+        deleteExecutor = 
deleteStatementExecutorFactory.apply(getRuntimeContext());
+        try {
+            
deleteExecutor.prepareStatements(connectionProvider.getConnection());
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private static JdbcBatchStatementExecutor<Row> createDeleteExecutor(
+            JdbcDmlOptions dmlOptions, RuntimeContext ctx) {
+        int[] pkFields =
+                Arrays.stream(dmlOptions.getFieldNames())
+                        
.mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf)
+                        .toArray();
+        int[] pkTypes =
+                dmlOptions.getFieldTypes() == null
+                        ? null
+                        : Arrays.stream(pkFields).map(f -> 
dmlOptions.getFieldTypes()[f]).toArray();
+        String deleteSql =
+                FieldNamedPreparedStatementImpl.parseNamedStatement(
+                        dmlOptions
+                                .getDialect()
+                                .getDeleteStatement(
+                                        dmlOptions.getTableName(), 
dmlOptions.getFieldNames()),
+                        new HashMap<>());
+        return createKeyedRowExecutor(pkFields, pkTypes, deleteSql);
+    }
+
+    @Override
+    protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted) 
throws SQLException {
+        if (original.f0) {
+            super.addToBatch(original, extracted);
+        } else {
+            deleteExecutor.addToBatch(extracted);
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        try {
+            super.close();
+        } finally {
+            try {
+                if (deleteExecutor != null) {
+                    deleteExecutor.closeStatements();
+                }
+            } catch (SQLException e) {
+                LOG.warn("unable to close delete statement runner", e);
+            }
+        }
+    }
+
+    @Override
+    protected void attemptFlush() throws SQLException {
+        super.attemptFlush();
+        deleteExecutor.executeBatch();
+    }
+
+    @Override
+    public void updateExecutor(boolean reconnect) throws SQLException, 
ClassNotFoundException {
+        super.updateExecutor(reconnect);
+        deleteExecutor.closeStatements();
+        deleteExecutor.prepareStatements(connectionProvider.getConnection());
+    }
+
+    private static JdbcBatchStatementExecutor<Row> createKeyedRowExecutor(
+            int[] pkFields, int[] pkTypes, String sql) {
+        return JdbcBatchStatementExecutor.keyed(
+                sql,
+                createRowKeyExtractor(pkFields),
+                (st, record) -> setRecordToStatement(
+                        st, pkTypes, 
createRowKeyExtractor(pkFields).apply(record)));
+    }
+
+    private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor(
+            JdbcDmlOptions opt, RuntimeContext ctx) {
+        checkArgument(opt.getKeyFields().isPresent());
+
+        int[] pkFields =
+                Arrays.stream(opt.getKeyFields().get())
+                        .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+                        .toArray();
+        int[] pkTypes =
+                opt.getFieldTypes() == null
+                        ? null
+                        : Arrays.stream(pkFields).map(f -> 
opt.getFieldTypes()[f]).toArray();
+
+        return opt.getDialect()
+                .getUpsertStatement(
+                        opt.getTableName(), opt.getFieldNames(), 
opt.getKeyFields().get())
+                .map(
+                        sql -> createSimpleRowExecutor(
+                                parseNamedStatement(sql),
+                                opt.getFieldTypes(),
+                                
ctx.getExecutionConfig().isObjectReuseEnabled()))
+                .orElseGet(
+                        () -> new InsertOrUpdateJdbcExecutor<>(
+                                parseNamedStatement(
+                                        opt.getDialect()
+                                                .getRowExistsStatement(
+                                                        opt.getTableName(),
+                                                        
opt.getKeyFields().get())),
+                                parseNamedStatement(
+                                        opt.getDialect()
+                                                .getInsertIntoStatement(
+                                                        opt.getTableName(),
+                                                        opt.getFieldNames())),
+                                parseNamedStatement(
+                                        opt.getDialect()
+                                                .getUpdateStatement(
+                                                        opt.getTableName(),
+                                                        opt.getFieldNames(),
+                                                        
opt.getKeyFields().get())),
+                                createRowJdbcStatementBuilder(pkTypes),
+                                
createRowJdbcStatementBuilder(opt.getFieldTypes()),
+                                
createRowJdbcStatementBuilder(opt.getFieldTypes()),
+                                createRowKeyExtractor(pkFields),
+                                ctx.getExecutionConfig().isObjectReuseEnabled()
+                                        ? Row::copy
+                                        : Function.identity()));
+    }
+
+    private static String parseNamedStatement(String statement) {
+        return FieldNamedPreparedStatementImpl.parseNamedStatement(statement, 
new HashMap<>());
+    }
+
+    private static Function<Row, Row> createRowKeyExtractor(int[] pkFields) {
+        return row -> getPrimaryKey(row, pkFields);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
new file mode 100644
index 0000000000..866d498133
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_MAX_RETRIES;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.MAX_RETRY_TIMEOUT;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_NUM;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_MAX_RETRIES;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_PARALLELISM;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource} 
and {@link
+ * JdbcDynamicTableSink}.
+ * Modify from {@link 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory}
+ */
+@Internal
+public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "jdbc-inlong";
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig config = helper.getOptions();
+
+        helper.validate();
+        validateConfigOptions(config, context.getClassLoader());
+        validateDataTypeWithJdbcDialect(
+                context.getPhysicalRowDataType(), config.get(URL), 
context.getClassLoader());
+        InternalJdbcConnectionOptions jdbcOptions =
+                getJdbcOptions(config, context.getClassLoader());
+
+        // inlong audit
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = 
config.getOptional(INLONG_AUDIT).orElse(null);
+        String auditKeys = config.getOptional(AUDIT_KEYS).orElse(null);
+
+        return new JdbcDynamicTableSink(
+                jdbcOptions,
+                getJdbcExecutionOptions(config),
+                getJdbcDmlOptions(
+                        jdbcOptions,
+                        context.getPhysicalRowDataType(),
+                        context.getPrimaryKeyIndexes()),
+                context.getPhysicalRowDataType(),
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig config = helper.getOptions();
+
+        helper.validate();
+        validateConfigOptions(config, context.getClassLoader());
+        validateDataTypeWithJdbcDialect(
+                context.getPhysicalRowDataType(), config.get(URL), 
context.getClassLoader());
+
+        return new JdbcDynamicTableSource(
+                getJdbcOptions(helper.getOptions(), context.getClassLoader()),
+                getJdbcReadOptions(helper.getOptions()),
+                helper.getOptions().get(LookupOptions.MAX_RETRIES),
+                getLookupCache(config),
+                context.getPhysicalRowDataType());
+    }
+
+    private static void validateDataTypeWithJdbcDialect(
+            DataType dataType, String url, ClassLoader classLoader) {
+        final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
+        dialect.validate((RowType) dataType.getLogicalType());
+    }
+
+    private InternalJdbcConnectionOptions getJdbcOptions(
+            ReadableConfig readableConfig, ClassLoader classLoader) {
+        final String url = readableConfig.get(URL);
+        final InternalJdbcConnectionOptions.Builder builder =
+                InternalJdbcConnectionOptions.builder()
+                        .setClassLoader(classLoader)
+                        .setDBUrl(url)
+                        .setTableName(readableConfig.get(TABLE_NAME))
+                        .setDialect(JdbcDialectLoader.load(url, classLoader))
+                        
.setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
+                        .setConnectionCheckTimeoutSeconds(
+                                (int) 
readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
+
+        readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+        return builder.build();
+    }
+
+    private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
+        final Optional<String> partitionColumnName =
+                readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+        final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+        if (partitionColumnName.isPresent()) {
+            builder.setPartitionColumnName(partitionColumnName.get());
+            
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+            
builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+            builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+        }
+        
readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+        builder.setAutoCommit(readableConfig.get(SCAN_AUTO_COMMIT));
+        return builder.build();
+    }
+
+    private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig 
config) {
+        final JdbcExecutionOptions.Builder builder = new 
JdbcExecutionOptions.Builder();
+        builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+        
builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+        builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
+        return builder.build();
+    }
+
+    private JdbcDmlOptions getJdbcDmlOptions(
+            InternalJdbcConnectionOptions jdbcOptions, DataType dataType, 
int[] primaryKeyIndexes) {
+
+        String[] keyFields =
+                Arrays.stream(primaryKeyIndexes)
+                        .mapToObj(i -> DataType.getFieldNames(dataType).get(i))
+                        .toArray(String[]::new);
+
+        return JdbcDmlOptions.builder()
+                .withTableName(jdbcOptions.getTableName())
+                .withDialect(jdbcOptions.getDialect())
+                .withFieldNames(DataType.getFieldNames(dataType).toArray(new 
String[0]))
+                .withKeyFields(keyFields.length > 0 ? keyFields : null)
+                .build();
+    }
+
+    @Nullable
+    private LookupCache getLookupCache(ReadableConfig tableOptions) {
+        LookupCache cache = null;
+        // Legacy cache options
+        if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+                && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) 
> 0) {
+            cache =
+                    DefaultLookupCache.newBuilder()
+                            
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+                            
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+                            
.cacheMissingKey(tableOptions.get(LOOKUP_CACHE_MISSING_KEY))
+                            .build();
+        }
+        if (tableOptions
+                .get(LookupOptions.CACHE_TYPE)
+                .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+            cache = DefaultLookupCache.fromConfig(tableOptions);
+        }
+        return cache;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(URL);
+        requiredOptions.add(TABLE_NAME);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+        optionalOptions.add(DRIVER);
+        optionalOptions.add(USERNAME);
+        optionalOptions.add(PASSWORD);
+        optionalOptions.add(SCAN_PARTITION_COLUMN);
+        optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
+        optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
+        optionalOptions.add(SCAN_PARTITION_NUM);
+        optionalOptions.add(SCAN_FETCH_SIZE);
+        optionalOptions.add(SCAN_AUTO_COMMIT);
+        optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
+        optionalOptions.add(LOOKUP_CACHE_TTL);
+        optionalOptions.add(LOOKUP_MAX_RETRIES);
+        optionalOptions.add(LOOKUP_CACHE_MISSING_KEY);
+        optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
+        optionalOptions.add(SINK_MAX_RETRIES);
+        optionalOptions.add(SINK_PARALLELISM);
+        optionalOptions.add(MAX_RETRY_TIMEOUT);
+        optionalOptions.add(LookupOptions.CACHE_TYPE);
+        optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+        optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+        optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
+        optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+        optionalOptions.add(LookupOptions.MAX_RETRIES);
+        optionalOptions.add(AUDIT_KEYS);
+        optionalOptions.add(INLONG_METRIC);
+        optionalOptions.add(INLONG_AUDIT);
+        return optionalOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                URL,
+                TABLE_NAME,
+                USERNAME,
+                PASSWORD,
+                DRIVER,
+                SINK_BUFFER_FLUSH_MAX_ROWS,
+                SINK_BUFFER_FLUSH_INTERVAL,
+                SINK_MAX_RETRIES,
+                MAX_RETRY_TIMEOUT,
+                SCAN_FETCH_SIZE,
+                SCAN_AUTO_COMMIT)
+                .collect(Collectors.toSet());
+    }
+
+    private void validateConfigOptions(ReadableConfig config, ClassLoader 
classLoader) {
+        String jdbcUrl = config.get(URL);
+        JdbcDialectLoader.load(jdbcUrl, classLoader);
+
+        checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD});
+
+        checkAllOrNone(
+                config,
+                new ConfigOption[]{
+                        SCAN_PARTITION_COLUMN,
+                        SCAN_PARTITION_NUM,
+                        SCAN_PARTITION_LOWER_BOUND,
+                        SCAN_PARTITION_UPPER_BOUND
+                });
+
+        if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
+                && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) 
{
+            long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+            long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+            if (lowerBound > upperBound) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "'%s'='%s' must not be larger than '%s'='%s'.",
+                                SCAN_PARTITION_LOWER_BOUND.key(),
+                                lowerBound,
+                                SCAN_PARTITION_UPPER_BOUND.key(),
+                                upperBound));
+            }
+        }
+
+        checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS, 
LOOKUP_CACHE_TTL});
+
+        if (config.get(LOOKUP_MAX_RETRIES) < 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The value of '%s' option shouldn't be negative, 
but is %s.",
+                            LOOKUP_MAX_RETRIES.key(), 
config.get(LOOKUP_MAX_RETRIES)));
+        }
+
+        if (config.get(SINK_MAX_RETRIES) < 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The value of '%s' option shouldn't be negative, 
but is %s.",
+                            SINK_MAX_RETRIES.key(), 
config.get(SINK_MAX_RETRIES)));
+        }
+
+        if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The value of '%s' option must be in second 
granularity and shouldn't be smaller than 1 second, but is %s.",
+                            MAX_RETRY_TIMEOUT.key(),
+                            config.get(
+                                    ConfigOptions.key(MAX_RETRY_TIMEOUT.key())
+                                            .stringType()
+                                            .noDefaultValue())));
+        }
+    }
+
+    private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] 
configOptions) {
+        int presentCount = 0;
+        for (ConfigOption configOption : configOptions) {
+            if (config.getOptional(configOption).isPresent()) {
+                presentCount++;
+            }
+        }
+        String[] propertyNames =
+                
Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
+        Preconditions.checkArgument(
+                configOptions.length == presentCount || presentCount == 0,
+                "Either all or none of the following options should be 
provided:\n"
+                        + String.join("\n", propertyNames));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
new file mode 100644
index 0000000000..6721130414
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A {@link DynamicTableSink} for JDBC.
+ * Modify from {@link 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink}
+ * */
+@Internal
+public class JdbcDynamicTableSink implements DynamicTableSink {
+
+    private final InternalJdbcConnectionOptions jdbcOptions;
+    private final JdbcExecutionOptions executionOptions;
+    private final JdbcDmlOptions dmlOptions;
+    private final DataType physicalRowDataType;
+    private final String dialectName;
+
+    // audit
+    String inlongMetric;
+    String auditHostAndPorts;
+    String auditKeys;
+
+    public JdbcDynamicTableSink(
+            InternalJdbcConnectionOptions jdbcOptions,
+            JdbcExecutionOptions executionOptions,
+            JdbcDmlOptions dmlOptions,
+            DataType physicalRowDataType,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
+        this.jdbcOptions = jdbcOptions;
+        this.executionOptions = executionOptions;
+        this.dmlOptions = dmlOptions;
+        this.physicalRowDataType = physicalRowDataType;
+        this.dialectName = dmlOptions.getDialect().dialectName();
+        // audit
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        validatePrimaryKey(requestedMode);
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.DELETE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .build();
+    }
+
+    private void validatePrimaryKey(ChangelogMode requestedMode) {
+        checkState(
+                ChangelogMode.insertOnly().equals(requestedMode)
+                        || dmlOptions.getKeyFields().isPresent(),
+                "please declare primary key for sink table when query contains 
update/delete record.");
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final TypeInformation<RowData> rowDataTypeInformation =
+                context.createTypeInformation(physicalRowDataType);
+        final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder();
+
+        builder.setJdbcOptions(jdbcOptions);
+        builder.setJdbcDmlOptions(dmlOptions);
+        builder.setJdbcExecutionOptions(executionOptions);
+        builder.setRowDataTypeInfo(rowDataTypeInformation);
+        builder.setFieldDataTypes(
+                DataType.getFieldDataTypes(physicalRowDataType).toArray(new 
DataType[0]));
+        // audit
+        builder.setInlongMetric(inlongMetric);
+        builder.setAuditHostAndPorts(auditHostAndPorts);
+        builder.setAuditKeys(auditKeys);
+        return SinkFunctionProvider.of(
+                new GenericJdbcSinkFunction<>(builder.build()), 
jdbcOptions.getParallelism());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new JdbcDynamicTableSink(
+                jdbcOptions, executionOptions, dmlOptions, physicalRowDataType,
+                inlongMetric, auditHostAndPorts, auditKeys);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "JDBC:" + dialectName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof JdbcDynamicTableSink)) {
+            return false;
+        }
+        JdbcDynamicTableSink that = (JdbcDynamicTableSink) o;
+        return Objects.equals(jdbcOptions, that.jdbcOptions)
+                && Objects.equals(executionOptions, that.executionOptions)
+                && Objects.equals(dmlOptions, that.dmlOptions)
+                && Objects.equals(physicalRowDataType, 
that.physicalRowDataType)
+                && Objects.equals(dialectName, that.dialectName)
+                && Objects.equals(inlongMetric, that.inlongMetric)
+                && Objects.equals(auditHostAndPorts, that.auditHostAndPorts)
+                && Objects.equals(auditKeys, that.auditKeys);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                jdbcOptions, executionOptions, dmlOptions, 
physicalRowDataType, dialectName,
+                inlongMetric, auditHostAndPorts, auditKeys);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
new file mode 100644
index 0000000000..436b763ee5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.internal.JdbcOutputFormat;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder for {@link JdbcOutputFormat} for Table/SQL.
+ * Modify from {@link 
org.apache.flink.connector.jdbc.table.JdbcOutputFormatBuilder}
+ * */
+public class JdbcOutputFormatBuilder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private InternalJdbcConnectionOptions jdbcOptions;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcDmlOptions dmlOptions;
+    private TypeInformation<RowData> rowDataTypeInformation;
+    private DataType[] fieldDataTypes;
+
+    // audit
+    String inlongMetric;
+    String auditHostAndPorts;
+    String auditKeys;
+
+    public JdbcOutputFormatBuilder() {
+    }
+
+    public JdbcOutputFormatBuilder 
setJdbcOptions(InternalJdbcConnectionOptions jdbcOptions) {
+        this.jdbcOptions = jdbcOptions;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder 
setJdbcExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = executionOptions;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions 
dmlOptions) {
+        this.dmlOptions = dmlOptions;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder setRowDataTypeInfo(TypeInformation<RowData> 
rowDataTypeInfo) {
+        this.rowDataTypeInformation = rowDataTypeInfo;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder setFieldDataTypes(DataType[] 
fieldDataTypes) {
+        this.fieldDataTypes = fieldDataTypes;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder setInlongMetric(String inlongMetric) {
+        this.inlongMetric = inlongMetric;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder setAuditHostAndPorts(String 
auditHostAndPorts) {
+        this.auditHostAndPorts = auditHostAndPorts;
+        return this;
+    }
+
+    public JdbcOutputFormatBuilder setAuditKeys(String auditKeys) {
+        this.auditKeys = auditKeys;
+        return this;
+    }
+
+    public JdbcOutputFormat<RowData, ?, ?> build() {
+        checkNotNull(jdbcOptions, "jdbc options can not be null");
+        checkNotNull(dmlOptions, "jdbc dml options can not be null");
+        checkNotNull(executionOptions, "jdbc execution options can not be 
null");
+
+        final LogicalType[] logicalTypes =
+                Arrays.stream(fieldDataTypes)
+                        .map(DataType::getLogicalType)
+                        .toArray(LogicalType[]::new);
+        if (dmlOptions.getKeyFields().isPresent() && 
dmlOptions.getKeyFields().get().length > 0) {
+            // upsert query
+            return new JdbcOutputFormat<>(
+                    new SimpleJdbcConnectionProvider(jdbcOptions),
+                    executionOptions,
+                    ctx -> createBufferReduceExecutor(
+                            dmlOptions, ctx, rowDataTypeInformation, 
logicalTypes),
+                    JdbcOutputFormat.RecordExtractor.identity(),
+                    inlongMetric,
+                    auditHostAndPorts,
+                    auditKeys);
+        } else {
+            // append only query
+            final String sql =
+                    dmlOptions
+                            .getDialect()
+                            .getInsertIntoStatement(
+                                    dmlOptions.getTableName(), 
dmlOptions.getFieldNames());
+            return new JdbcOutputFormat<>(
+                    new SimpleJdbcConnectionProvider(jdbcOptions),
+                    executionOptions,
+                    ctx -> createSimpleBufferedExecutor(
+                            ctx,
+                            dmlOptions.getDialect(),
+                            dmlOptions.getFieldNames(),
+                            logicalTypes,
+                            sql,
+                            rowDataTypeInformation),
+                    JdbcOutputFormat.RecordExtractor.identity(),
+                    inlongMetric,
+                    auditHostAndPorts,
+                    auditKeys);
+        }
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> 
createBufferReduceExecutor(
+            JdbcDmlOptions opt,
+            RuntimeContext ctx,
+            TypeInformation<RowData> rowDataTypeInfo,
+            LogicalType[] fieldTypes) {
+        checkArgument(opt.getKeyFields().isPresent());
+        JdbcDialect dialect = opt.getDialect();
+        String tableName = opt.getTableName();
+        String[] pkNames = opt.getKeyFields().get();
+        int[] pkFields =
+                Arrays.stream(pkNames)
+                        .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+                        .toArray();
+        LogicalType[] pkTypes =
+                Arrays.stream(pkFields).mapToObj(f -> 
fieldTypes[f]).toArray(LogicalType[]::new);
+        final TypeSerializer<RowData> typeSerializer =
+                rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+        final Function<RowData, RowData> valueTransform =
+                ctx.getExecutionConfig().isObjectReuseEnabled()
+                        ? typeSerializer::copy
+                        : Function.identity();
+
+        return new TableBufferReducedStatementExecutor(
+                createUpsertRowExecutor(
+                        dialect,
+                        tableName,
+                        opt.getFieldNames(),
+                        fieldTypes,
+                        pkFields,
+                        pkNames,
+                        pkTypes),
+                createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
+                createRowKeyExtractor(fieldTypes, pkFields),
+                valueTransform);
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> 
createSimpleBufferedExecutor(
+            RuntimeContext ctx,
+            JdbcDialect dialect,
+            String[] fieldNames,
+            LogicalType[] fieldTypes,
+            String sql,
+            TypeInformation<RowData> rowDataTypeInfo) {
+        final TypeSerializer<RowData> typeSerializer =
+                rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+        return new TableBufferedStatementExecutor(
+                createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql),
+                ctx.getExecutionConfig().isObjectReuseEnabled()
+                        ? typeSerializer::copy
+                        : Function.identity());
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(
+            JdbcDialect dialect,
+            String tableName,
+            String[] fieldNames,
+            LogicalType[] fieldTypes,
+            int[] pkFields,
+            String[] pkNames,
+            LogicalType[] pkTypes) {
+        return dialect.getUpsertStatement(tableName, fieldNames, pkNames)
+                .map(sql -> createSimpleRowExecutor(dialect, fieldNames, 
fieldTypes, sql))
+                .orElseGet(
+                        () -> createInsertOrUpdateExecutor(
+                                dialect,
+                                tableName,
+                                fieldNames,
+                                fieldTypes,
+                                pkFields,
+                                pkNames,
+                                pkTypes));
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createDeleteExecutor(
+            JdbcDialect dialect, String tableName, String[] pkNames, 
LogicalType[] pkTypes) {
+        String deleteSql = dialect.getDeleteStatement(tableName, pkNames);
+        return createSimpleRowExecutor(dialect, pkNames, pkTypes, deleteSql);
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor(
+            JdbcDialect dialect, String[] fieldNames, LogicalType[] 
fieldTypes, final String sql) {
+        final JdbcRowConverter rowConverter = 
dialect.getRowConverter(RowType.of(fieldTypes));
+        return new TableSimpleStatementExecutor(
+                connection -> 
FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames),
+                rowConverter);
+    }
+
+    private static JdbcBatchStatementExecutor<RowData> 
createInsertOrUpdateExecutor(
+            JdbcDialect dialect,
+            String tableName,
+            String[] fieldNames,
+            LogicalType[] fieldTypes,
+            int[] pkFields,
+            String[] pkNames,
+            LogicalType[] pkTypes) {
+        final String existStmt = dialect.getRowExistsStatement(tableName, 
pkNames);
+        final String insertStmt = dialect.getInsertIntoStatement(tableName, 
fieldNames);
+        final String updateStmt = dialect.getUpdateStatement(tableName, 
fieldNames, pkNames);
+        return new TableInsertOrUpdateStatementExecutor(
+                connection -> FieldNamedPreparedStatement.prepareStatement(
+                        connection, existStmt, pkNames),
+                connection -> FieldNamedPreparedStatement.prepareStatement(
+                        connection, insertStmt, fieldNames),
+                connection -> FieldNamedPreparedStatement.prepareStatement(
+                        connection, updateStmt, fieldNames),
+                dialect.getRowConverter(RowType.of(pkTypes)),
+                dialect.getRowConverter(RowType.of(fieldTypes)),
+                dialect.getRowConverter(RowType.of(fieldTypes)),
+                createRowKeyExtractor(fieldTypes, pkFields));
+    }
+
+    private static Function<RowData, RowData> createRowKeyExtractor(
+            LogicalType[] logicalTypes, int[] pkFields) {
+        final RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[pkFields.length];
+        for (int i = 0; i < pkFields.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]], 
pkFields[i]);
+        }
+        return row -> getPrimaryKey(row, fieldGetters);
+    }
+
+    private static RowData getPrimaryKey(RowData row, RowData.FieldGetter[] 
fieldGetters) {
+        GenericRowData pkRow = new GenericRowData(fieldGetters.length);
+        for (int i = 0; i < fieldGetters.length; i++) {
+            pkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+        }
+        return pkRow;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..a14e9cc440
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.jdbc.table.JdbcDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index a3fe402733..cf81e1a2ac 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -32,6 +32,7 @@
 
     <modules>
         <module>pulsar</module>
+        <module>jdbc</module>
     </modules>
 
     <properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index bd944b384c..e0443b0fd6 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -838,6 +838,15 @@
   Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
   License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
 
+  1.3.22 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+         
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+         
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
+         
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+         
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
+         
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+    Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
+    License : 
https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 

Reply via email to