This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3291aeb26 [INLONG-6819][Sort] Add multi table sink for PostgresSQL 
(#6820)
3291aeb26 is described below

commit 3291aeb260cdbea5bdba86f9a044dc08fd157d2a
Author: kuansix <490305...@qq.com>
AuthorDate: Fri Dec 16 19:22:18 2022 +0800

    [INLONG-6819][Sort] Add multi table sink for PostgresSQL (#6820)
---
 .../JdbcMultiBatchingComm.java}                    | 139 +----
 .../internal/JdbcMultiBatchingOutputFormat.java    | 639 +++++++++++++++++++++
 .../jdbc/table/JdbcDynamicOutputFormatBuilder.java |  52 ++
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   |  61 +-
 .../sort/jdbc/table/JdbcDynamicTableSink.java      |  47 +-
 5 files changed, 797 insertions(+), 141 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java
similarity index 60%
copy from 
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
copy to 
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java
index 4a0bfa228..b22ac81b4 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.jdbc.table;
+package org.apache.inlong.sort.jdbc.internal;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
@@ -30,51 +28,24 @@ import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementE
 import 
org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
 import 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
-import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.inlong.sort.base.dirty.DirtyOptions;
-import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.function.Function;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
- *
- * Builder for {@link JdbcBatchingOutputFormat} for Table/SQL.
- * Add an option `sink.ignore.changelog` to support insert-only mode without 
primaryKey.
+ * Comm function for A JDBC multi-table outputFormat to get or create JDBC 
Executor
  */
-public class JdbcDynamicOutputFormatBuilder implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private JdbcOptions jdbcOptions;
-    private JdbcExecutionOptions executionOptions;
-    private JdbcDmlOptions dmlOptions;
-    private boolean appendMode;
-    private TypeInformation<RowData> rowDataTypeInformation;
-    private DataType[] fieldDataTypes;
-    private String inlongMetric;
-    private String auditHostAndPorts;
-    private DirtyOptions dirtyOptions;
-    private DirtySink<Object> dirtySink;
-
-    public JdbcDynamicOutputFormatBuilder() {
+public class JdbcMultiBatchingComm {
 
-    }
-
-    private static JdbcBatchStatementExecutor<RowData> 
createBufferReduceExecutor(
+    public static JdbcBatchStatementExecutor<RowData> 
createBufferReduceExecutor(
             JdbcDmlOptions opt,
             RuntimeContext ctx,
             TypeInformation<RowData> rowDataTypeInfo,
@@ -95,6 +66,7 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                 ctx.getExecutionConfig().isObjectReuseEnabled()
                         ? typeSerializer::copy
                         : Function.identity();
+
         return new TableBufferReducedStatementExecutor(
                 createUpsertRowExecutor(
                         dialect,
@@ -109,7 +81,7 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                 valueTransform);
     }
 
-    private static JdbcBatchStatementExecutor<RowData> 
createSimpleBufferedExecutor(
+    public static JdbcBatchStatementExecutor<RowData> 
createSimpleBufferedExecutor(
             RuntimeContext ctx,
             JdbcDialect dialect,
             String[] fieldNames,
@@ -201,101 +173,4 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
         return pkRow;
     }
 
-    public JdbcDynamicOutputFormatBuilder setAppendMode(boolean appendMode) {
-        this.appendMode = appendMode;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setJdbcOptions(JdbcOptions 
jdbcOptions) {
-        this.jdbcOptions = jdbcOptions;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setJdbcExecutionOptions(
-            JdbcExecutionOptions executionOptions) {
-        this.executionOptions = executionOptions;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions 
dmlOptions) {
-        this.dmlOptions = dmlOptions;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setRowDataTypeInfo(
-            TypeInformation<RowData> rowDataTypeInfo) {
-        this.rowDataTypeInformation = rowDataTypeInfo;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setFieldDataTypes(DataType[] 
fieldDataTypes) {
-        this.fieldDataTypes = fieldDataTypes;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setInLongMetric(String inlongMetric) 
{
-        this.inlongMetric = inlongMetric;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setAuditHostAndPorts(String 
auditHostAndPorts) {
-        this.auditHostAndPorts = auditHostAndPorts;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions 
dirtyOptions) {
-        this.dirtyOptions = dirtyOptions;
-        return this;
-    }
-
-    public JdbcDynamicOutputFormatBuilder setDirtySink(DirtySink<Object> 
dirtySink) {
-        this.dirtySink = dirtySink;
-        return this;
-    }
-
-    public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
-        checkNotNull(jdbcOptions, "jdbc options can not be null");
-        checkNotNull(dmlOptions, "jdbc dml options can not be null");
-        checkNotNull(executionOptions, "jdbc execution options can not be 
null");
-
-        final LogicalType[] logicalTypes =
-                Arrays.stream(fieldDataTypes)
-                        .map(DataType::getLogicalType)
-                        .toArray(LogicalType[]::new);
-        if (dmlOptions.getKeyFields().isPresent() && 
dmlOptions.getKeyFields().get().length > 0 && !appendMode) {
-            // upsert query
-            return new JdbcBatchingOutputFormat<>(
-                    new SimpleJdbcConnectionProvider(jdbcOptions),
-                    executionOptions,
-                    ctx -> createBufferReduceExecutor(
-                            dmlOptions, ctx, rowDataTypeInformation, 
logicalTypes),
-                    JdbcBatchingOutputFormat.RecordExtractor.identity(),
-                    inlongMetric,
-                    auditHostAndPorts,
-                    dirtyOptions,
-                    dirtySink);
-        } else {
-            // append only query
-            final String sql =
-                    dmlOptions
-                            .getDialect()
-                            .getInsertIntoStatement(
-                                    dmlOptions.getTableName(), 
dmlOptions.getFieldNames());
-            return new JdbcBatchingOutputFormat<>(
-                    new SimpleJdbcConnectionProvider(jdbcOptions),
-                    executionOptions,
-                    ctx -> createSimpleBufferedExecutor(
-                            ctx,
-                            dmlOptions.getDialect(),
-                            dmlOptions.getFieldNames(),
-                            logicalTypes,
-                            sql,
-                            rowDataTypeInformation),
-                    JdbcBatchingOutputFormat.RecordExtractor.identity(),
-                    inlongMetric,
-                    auditHostAndPorts,
-                    dirtyOptions,
-                    dirtySink);
-        }
-    }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
new file mode 100644
index 000000000..069d41f31
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+
+/**
+ * A JDBC multi-table outputFormat that supports batching records before 
writing records to databases.
+ * Add an option `inlong.metric` to support metrics.
+ */
+public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExecutor<JdbcIn>>
+        extends
+            AbstractJdbcOutputFormat<In> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcMultiBatchingOutputFormat.class);
+    private final JdbcExecutionOptions executionOptions;
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient RuntimeContext runtimeContext;
+    private JdbcDmlOptions dmlOptions;
+    private JdbcOptions jdbcOptions;
+    private boolean appendMode;
+    private transient Map<String, JdbcExec> jdbcExecMap = new HashMap<>();
+    private transient Map<String, SimpleJdbcConnectionProvider> 
connectionExecProviderMap = new HashMap<>();
+    private transient Map<String, RowType> rowTypeMap = new HashMap<>();
+    private transient Map<String, List<String>> pkNameMap = new HashMap<>();
+    private transient Map<String, List<GenericRowData>> recordsMap = new 
HashMap<>();
+    private transient Map<String, Exception> tableExceptionMap = new 
HashMap<>();
+    private transient Boolean isIgnoreTableException;
+
+    private transient ListState<MetricState> metricStateListState;
+    private final String sinkMultipleFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+    private final String schemaPattern;
+    private transient MetricState metricState;
+    private SinkMetricData sinkMetricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+    private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
+
+    public JdbcMultiBatchingOutputFormat(
+            @Nonnull JdbcConnectionProvider connectionProvider,
+            @Nonnull JdbcExecutionOptions executionOptions,
+            @Nonnull JdbcDmlOptions dmlOptions,
+            @Nonnull boolean appendMode,
+            @Nonnull JdbcOptions jdbcOptions,
+            String sinkMultipleFormat,
+            String databasePattern,
+            String tablePattern,
+            String schemaPattern,
+            String inlongMetric,
+            String auditHostAndPorts,
+            SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+        super(connectionProvider);
+        this.executionOptions = checkNotNull(executionOptions);
+        this.dmlOptions = dmlOptions;
+        this.appendMode = appendMode;
+        this.jdbcOptions = jdbcOptions;
+        this.sinkMultipleFormat = sinkMultipleFormat;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.schemaPattern = schemaPattern;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     *
+     * @param taskNumber The number of the parallel instance.
+     */
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        this.runtimeContext = getRuntimeContext();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, 
runtimeContext.getMetricGroup());
+        }
+        jdbcExecMap = new HashMap<>();
+        connectionExecProviderMap = new HashMap<>();
+        pkNameMap = new HashMap<>();
+        rowTypeMap = new HashMap<>();
+        recordsMap = new HashMap<>();
+        tableExceptionMap = new HashMap<>();
+        isIgnoreTableException = 
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE)
+                || 
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL);
+        if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(
+                            1, new 
ExecutorThreadFactory("jdbc-upsert-output-format"));
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized 
(JdbcMultiBatchingOutputFormat.this) {
+                                    if (!closed) {
+                                        try {
+                                            flush();
+                                            if (sinkMetricData != null) {
+                                                sinkMetricData.invoke(rowSize, 
dataSize);
+                                            }
+                                            resetStateAfterFlush();
+                                        } catch (Exception e) {
+                                            if (sinkMetricData != null) {
+                                                
sinkMetricData.invokeDirty(rowSize, dataSize);
+                                            }
+                                            resetStateAfterFlush();
+                                        }
+                                    }
+                                }
+                            },
+                            executionOptions.getBatchIntervalMs(),
+                            executionOptions.getBatchIntervalMs(),
+                            TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * get or create  StatementExecutor for one table.
+     *
+     * @param tableIdentifier The table identifier for which to get 
statementExecutor.
+     */
+    private JdbcExec getOrCreateStatementExecutor(
+            String tableIdentifier) throws IOException {
+        if (StringUtils.isBlank(tableIdentifier)) {
+            return null;
+        }
+        JdbcExec jdbcExec = jdbcExecMap.get(tableIdentifier);
+        if (null != jdbcExec) {
+            return jdbcExec;
+        }
+        RowType rowType = rowTypeMap.get(tableIdentifier);
+        LogicalType[] logicalTypes = rowType.getFields().stream()
+                .map(RowType.RowField::getType)
+                .toArray(LogicalType[]::new);
+        String[] filedNames = rowType.getFields().stream()
+                .map(RowType.RowField::getName)
+                .toArray(String[]::new);
+        TypeInformation<RowData> rowDataTypeInfo = 
InternalTypeInfo.of(rowType);
+        List<String> pkNameList = null;
+        if (null != pkNameMap.get(tableIdentifier)) {
+            pkNameList = pkNameMap.get(tableIdentifier);
+        }
+        StatementExecutorFactory<JdbcExec> statementExecutorFactory = null;
+        if (CollectionUtils.isNotEmpty(pkNameList) && !appendMode) {
+            // upsert query
+            JdbcDmlOptions createDmlOptions = JdbcDmlOptions.builder()
+                    .withTableName(getTbNameFromIdentifier(tableIdentifier))
+                    .withDialect(jdbcOptions.getDialect())
+                    .withFieldNames(filedNames)
+                    .withKeyFields(pkNameList.toArray(new 
String[pkNameList.size()]))
+                    .build();
+            statementExecutorFactory = ctx -> (JdbcExec) 
JdbcMultiBatchingComm.createBufferReduceExecutor(
+                    createDmlOptions, ctx, rowDataTypeInfo, logicalTypes);
+
+        } else {
+            // append only query
+            final String sql = dmlOptions
+                    .getDialect()
+                    
.getInsertIntoStatement(getTbNameFromIdentifier(tableIdentifier), filedNames);
+            statementExecutorFactory = ctx -> (JdbcExec) 
JdbcMultiBatchingComm.createSimpleBufferedExecutor(
+                    ctx,
+                    dmlOptions.getDialect(),
+                    filedNames,
+                    logicalTypes,
+                    sql,
+                    rowDataTypeInfo);
+        }
+
+        jdbcExec = statementExecutorFactory.apply(getRuntimeContext());
+        try {
+            JdbcOptions jdbcExecOptions =
+                    JdbcOptions.builder()
+                            .setDBUrl(jdbcOptions.getDbURL() + "/" + 
getTDbNameFromIdentifier(tableIdentifier))
+                            
.setTableName(getTbNameFromIdentifier(tableIdentifier))
+                            .setDialect(jdbcOptions.getDialect())
+                            .setParallelism(jdbcOptions.getParallelism())
+                            
.setConnectionCheckTimeoutSeconds(jdbcOptions.getConnectionCheckTimeoutSeconds())
+                            .setDriverName(jdbcOptions.getDriverName())
+                            .setUsername(jdbcOptions.getUsername().orElse(""))
+                            .setPassword(jdbcOptions.getPassword().orElse(""))
+                            .build();
+            SimpleJdbcConnectionProvider tableConnectionProvider = new 
SimpleJdbcConnectionProvider(jdbcExecOptions);
+            try {
+                tableConnectionProvider.getOrEstablishConnection();
+            } catch (Exception e) {
+                LOG.error("unable to open JDBC writer, tableIdentifier:{} 
err:", tableIdentifier, e);
+                return null;
+            }
+            connectionExecProviderMap.put(tableIdentifier, 
tableConnectionProvider);
+            
jdbcExec.prepareStatements(tableConnectionProvider.getConnection());
+        } catch (Exception e) {
+            return null;
+        }
+        jdbcExecMap.put(tableIdentifier, jdbcExec);
+        return jdbcExec;
+    }
+
+    /**
+     * Get table name From tableIdentifier
+     * tableIdentifier maybe: ${dbName}.${tbName} or 
${dbName}.${schemaName}.${tbName}
+     *
+     * @param tableIdentifier The table identifier for which to get table name.
+     */
+    private String getTbNameFromIdentifier(String tableIdentifier) {
+        String[] fileArray = tableIdentifier.split("\\.");
+        if (2 == fileArray.length) {
+            return fileArray[1];
+        }
+        if (3 == fileArray.length) {
+            return fileArray[1] + "." + fileArray[2];
+        }
+        return null;
+    }
+
+    private String getTDbNameFromIdentifier(String tableIdentifier) {
+        String[] fileArray = tableIdentifier.split("\\.");
+        return fileArray[0];
+    }
+
+    private void checkFlushException() {
+        if 
(schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)
+                && !tableExceptionMap.isEmpty()) {
+            String tableErr = "Writing table get failed, tables are:";
+            for (Map.Entry<String, Exception> entry : 
tableExceptionMap.entrySet()) {
+                LOG.error("Writing table:{} get err:{}", entry.getKey(), 
entry.getValue().getMessage());
+                tableErr = tableErr + entry.getKey() + ",";
+            }
+            throw new RuntimeException(tableErr.substring(0, tableErr.length() 
- 1));
+        }
+    }
+
+    /**
+     * Extract and write record to recordsMap(buffer)
+     *
+     * @param row The record to write.
+     */
+    @Override
+    public final synchronized void writeRecord(In row) throws IOException {
+        checkFlushException();
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        if (row instanceof RowData) {
+            RowData rowData = (RowData) row;
+            JsonNode rootNode = 
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
+            String tableIdentifier = null;
+            try {
+                if (StringUtils.isBlank(schemaPattern)) {
+                    tableIdentifier = StringUtils.join(
+                            jsonDynamicSchemaFormat.parse(rootNode, 
databasePattern), ".",
+                            jsonDynamicSchemaFormat.parse(rootNode, 
tablePattern));
+                } else {
+                    tableIdentifier = StringUtils.join(
+                            jsonDynamicSchemaFormat.parse(rootNode, 
databasePattern), ".",
+                            jsonDynamicSchemaFormat.parse(rootNode, 
schemaPattern), ".",
+                            jsonDynamicSchemaFormat.parse(rootNode, 
tablePattern));
+                }
+            } catch (Exception e) {
+                LOG.info("Cal tableIdentifier get Exception:", e);
+                return;
+            }
+            rowSize++;
+            dataSize = dataSize + 
rootNode.toString().getBytes(StandardCharsets.UTF_8).length;
+
+            GenericRowData record = null;
+            try {
+                RowType rowType = 
jsonDynamicSchemaFormat.extractSchema(rootNode);
+                if (rowType != null) {
+                    if (null != rowTypeMap.get(tableIdentifier)) {
+                        if (!rowType.equals(rowTypeMap.get(tableIdentifier))) {
+                            attemptFlush();
+                            rowTypeMap.put(tableIdentifier, rowType);
+                            updateOneExecutor(true, tableIdentifier);
+                        }
+                    } else {
+                        rowTypeMap.put(tableIdentifier, rowType);
+                    }
+                }
+                List<String> pkNameList = 
jsonDynamicSchemaFormat.extractPrimaryKeyNames(rootNode);
+                pkNameMap.put(tableIdentifier, pkNameList);
+                JsonNode physicalData = 
jsonDynamicSchemaFormat.getPhysicalData(rootNode);
+                List<Map<String, String>> physicalDataList = 
jsonDynamicSchemaFormat.jsonNode2Map(physicalData);
+                record = generateRecord(rowType, physicalDataList.get(0));
+                List<RowKind> rowKinds = jsonDynamicSchemaFormat
+                        
.opType2RowKind(jsonDynamicSchemaFormat.getOpType(rootNode));
+                record.setRowKind(rowKinds.get(rowKinds.size() - 1));
+            } catch (Exception e) {
+                LOG.warn("Extract schema failed", e);
+                return;
+            }
+            try {
+                recordsMap.computeIfAbsent(tableIdentifier, k -> new 
ArrayList<>())
+                        .add(record);
+                batchCount++;
+                if (executionOptions.getBatchSize() > 0
+                        && batchCount >= executionOptions.getBatchSize()) {
+                    flush();
+                    if (sinkMetricData != null) {
+                        sinkMetricData.invoke(rowSize, dataSize);
+                    }
+                    resetStateAfterFlush();
+                }
+            } catch (Exception e) {
+                if (sinkMetricData != null) {
+                    sinkMetricData.invokeDirty(rowSize, dataSize);
+                }
+                resetStateAfterFlush();
+                throw new IOException("Writing records to JDBC failed.", e);
+            }
+        }
+    }
+
+    /**
+     * Convert fieldMap(data) to GenericRowData with rowType(schema)
+     */
+    protected GenericRowData generateRecord(RowType rowType, Map<String, 
String> fieldMap) {
+        String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+        int arity = fieldNames.length;
+        GenericRowData record = new GenericRowData(arity);
+        for (int i = 0; i < arity; i++) {
+            String fieldName = fieldNames[i];
+            String fieldValue = fieldMap.get(fieldName);
+            if (StringUtils.isBlank(fieldValue)) {
+                record.setField(i, null);
+            } else {
+                switch (rowType.getFields().get(i).getType().getTypeRoot()) {
+                    case BIGINT:
+                        record.setField(i, Long.valueOf(fieldValue));
+                        break;
+                    case BOOLEAN:
+                        record.setField(i, Boolean.valueOf(fieldValue));
+                        break;
+                    case DOUBLE:
+                    case DECIMAL:
+                        record.setField(i, Double.valueOf(fieldValue));
+                        break;
+                    case TIME_WITHOUT_TIME_ZONE:
+                    case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    case INTERVAL_DAY_TIME:
+                        TimestampData timestampData = 
TimestampData.fromEpochMillis(Long.valueOf(fieldValue));
+                        record.setField(i, timestampData);
+                        break;
+                    case BINARY:
+                        record.setField(i, 
Arrays.toString(fieldValue.getBytes(StandardCharsets.UTF_8)));
+                        break;
+                    default:
+                        record.setField(i, StringData.fromString(fieldValue));
+                }
+            }
+        }
+        return record;
+    }
+
+    private void resetStateAfterFlush() {
+        dataSize = 0L;
+        rowSize = 0L;
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        if (sinkMetricData != null && metricStateListState != null) {
+            
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (this.inlongMetric != null) {
+            this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint<MetricState>() {
+                            })));
+
+        }
+        if (context.isRestored()) {
+            metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        checkFlushException();
+        attemptFlush();
+        batchCount = 0;
+    }
+
+    /**
+     * Write all recorde from recordsMap to db
+     *
+     * First batch writing.
+     * If batch-writing occur exception, then rewrite one-by-one retry-times 
set by user.
+     */
+    protected void attemptFlush() throws IOException {
+        for (Map.Entry<String, List<GenericRowData>> entry : 
recordsMap.entrySet()) {
+            String tableIdentifier = entry.getKey();
+            boolean isIgnoreTableIdentifierException = isIgnoreTableException
+                    && (null != tableExceptionMap.get(tableIdentifier));
+            if (isIgnoreTableIdentifierException) {
+                continue;
+            }
+            List<GenericRowData> tableIdRecordList = entry.getValue();
+            if (CollectionUtils.isEmpty(tableIdRecordList)) {
+                continue;
+            }
+            JdbcExec jdbcStatementExecutor = null;
+            Boolean flushFlag = false;
+            Exception tableException = null;
+            try {
+                jdbcStatementExecutor = 
getOrCreateStatementExecutor(tableIdentifier);
+                for (GenericRowData record : tableIdRecordList) {
+                    jdbcStatementExecutor.addToBatch((JdbcIn) record);
+                }
+                jdbcStatementExecutor.executeBatch();
+                flushFlag = true;
+            } catch (Exception e) {
+                tableException = e;
+                LOG.warn("Flush all data for tableIdentifier:{} get err:", 
tableIdentifier, e);
+                getAndSetPkFromErrMsg(tableIdentifier, e.getMessage());
+                updateOneExecutor(true, tableIdentifier);
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(
+                            "unable to flush; interrupted while doing another 
attempt", e);
+                }
+            }
+
+            if (!flushFlag) {
+                for (GenericRowData record : tableIdRecordList) {
+                    for (int retryTimes = 1; retryTimes <= 
executionOptions.getMaxRetries(); retryTimes++) {
+                        try {
+                            jdbcStatementExecutor = 
getOrCreateStatementExecutor(tableIdentifier);
+                            jdbcStatementExecutor.addToBatch((JdbcIn) record);
+                            jdbcStatementExecutor.executeBatch();
+                            flushFlag = true;
+                            break;
+                        } catch (Exception e) {
+                            LOG.error("Flush one record tableIdentifier:{} 
,retryTimes:{} get err:",
+                                    tableIdentifier, retryTimes, e);
+                            getAndSetPkFromErrMsg(e.getMessage(), 
tableIdentifier);
+                            tableException = e;
+                            updateOneExecutor(true, tableIdentifier);
+                            try {
+                                Thread.sleep(1000 * retryTimes);
+                            } catch (InterruptedException ex) {
+                                Thread.currentThread().interrupt();
+                                throw new IOException(
+                                        "unable to flush; interrupted while 
doing another attempt", e);
+                            }
+                        }
+                    }
+                    if (!flushFlag && null != tableException) {
+                        LOG.info("Put tableIdentifier:{} exception:{}",
+                                tableIdentifier, tableException.getMessage());
+                        tableExceptionMap.put(tableIdentifier, tableException);
+                        if (isIgnoreTableException) {
+                            LOG.info("Stop write table:{} because occur 
exception",
+                                    tableIdentifier);
+                            continue;
+                        }
+                    }
+                }
+            }
+            tableIdRecordList.clear();
+        }
+    }
+
+    /**
+     * Executes prepared statement and closes all resources of this instance.
+     */
+    @Override
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to JDBC failed.", e);
+                    throw new RuntimeException("Writing records to JDBC 
failed.", e);
+                }
+            }
+
+            try {
+                if (null != jdbcExecMap) {
+                    jdbcExecMap.forEach((tableIdentifier, jdbcExec) -> {
+                        try {
+                            jdbcExec.closeStatements();
+                        } catch (SQLException e) {
+                            LOG.error("jdbcExec executeBatch get err", e);
+                        }
+                    });
+                }
+            } catch (Exception e) {
+                LOG.warn("Close JDBC writer failed.", e);
+            }
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    public boolean getAndSetPkFromErrMsg(String errMsg, String 
tableIdentifier) {
+        String rgex = "Detail: Key \\((.*?)\\)=\\(";
+        Pattern pattern = Pattern.compile(rgex);
+        Matcher m = pattern.matcher(errMsg);
+        List<String> pkNameList = new ArrayList<>();
+        if (m.find()) {
+            String[] pkNameArray = m.group(1).split(",");
+            for (String pkName : pkNameArray) {
+                pkNameList.add(pkName.trim());
+            }
+            pkNameMap.put(tableIdentifier, pkNameList);
+            return true;
+        }
+        return false;
+    }
+
+    public void updateOneExecutor(boolean reconnect, String tableIdentifier) {
+        try {
+            SimpleJdbcConnectionProvider tableConnectionProvider = 
connectionExecProviderMap.get(tableIdentifier);
+            if (reconnect || null == tableConnectionProvider
+                    || !tableConnectionProvider.isConnectionValid()) {
+                JdbcExec tableJdbcExec = jdbcExecMap.get(tableIdentifier);
+                if (null != tableJdbcExec) {
+                    tableJdbcExec.closeStatements();
+                    jdbcExecMap.remove(tableIdentifier);
+                    getOrCreateStatementExecutor(tableIdentifier);
+                }
+            }
+        } catch (SQLException | IOException e) {
+            LOG.error("jdbcExec updateOneExecutor get err", e);
+        }
+    }
+
+    /**
+     * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+     *
+     * @param <T> The type of instance.
+     */
+    public interface StatementExecutorFactory<T extends 
JdbcBatchStatementExecutor<?>>
+            extends
+                Function<RuntimeContext, T>,
+                Serializable {
+
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 4a0bfa228..531864cd4 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -37,9 +37,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.inlong.sort.jdbc.internal.JdbcMultiBatchingOutputFormat;
 
 import java.io.Serializable;
 import java.util.Arrays;
@@ -67,6 +69,11 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
     private DataType[] fieldDataTypes;
     private String inlongMetric;
     private String auditHostAndPorts;
+    private String sinkMultipleFormat;
+    private String databasePattern;
+    private String tablePattern;
+    private String schemaPattern;
+    private SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
     private DirtyOptions dirtyOptions;
     private DirtySink<Object> dirtySink;
 
@@ -243,6 +250,32 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
         return this;
     }
 
+    public JdbcDynamicOutputFormatBuilder setSinkMultipleFormat(String 
sinkMultipleFormat) {
+        this.sinkMultipleFormat = sinkMultipleFormat;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setDatabasePattern(String 
databasePattern) {
+        this.databasePattern = databasePattern;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setTablePattern(String tablePattern) 
{
+        this.tablePattern = tablePattern;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setSchemaPattern(String 
schemaPattern) {
+        this.schemaPattern = schemaPattern;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setSchemaUpdatePolicy(
+            SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+        this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+        return this;
+    }
+
     public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions 
dirtyOptions) {
         this.dirtyOptions = dirtyOptions;
         return this;
@@ -298,4 +331,23 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                     dirtySink);
         }
     }
+
+    public JdbcMultiBatchingOutputFormat<RowData, ?, ?> buildMulti() {
+        checkNotNull(jdbcOptions, "jdbc options can not be null");
+        checkNotNull(dmlOptions, "jdbc dml options can not be null");
+        checkNotNull(executionOptions, "jdbc execution options can not be 
null");
+        return new JdbcMultiBatchingOutputFormat<>(
+                new SimpleJdbcConnectionProvider(jdbcOptions),
+                executionOptions,
+                dmlOptions,
+                appendMode,
+                jdbcOptions,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                schemaPattern,
+                inlongMetric,
+                auditHostAndPorts,
+                schemaUpdateExceptionPolicy);
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 533224f00..e12128ec8 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -41,16 +41,25 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.JdbcUrlUtils;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
-import org.apache.inlong.sort.base.util.JdbcUrlUtils;
 
-import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 
 /**
  * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
@@ -182,6 +191,14 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                     .defaultValue(false)
                     .withDescription("Whether to support sink update/delete 
data without primaryKey.");
 
+    public static final ConfigOption<String> SINK_MULTIPLE_SCHEMA_PATTERN =
+            ConfigOptions.key("sink.multiple.schema-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The option 
'sink.multiple.schema-pattern' "
+                            + "is used extract table name from the raw binary 
data, "
+                            + "this is only used in the multiple sink writing 
scenario.");
+
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
@@ -190,12 +207,20 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
 
         helper.validateExcept(DIRTY_PREFIX);
         validateConfigOptions(config);
+        boolean multipleSink = 
config.getOptional(SINK_MULTIPLE_ENABLE).orElse(false);
+        String sinkMultipleFormat = 
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
+        String databasePattern = 
helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
+        String tablePattern = 
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
+        String schemaPattern = 
helper.getOptions().getOptional(SINK_MULTIPLE_SCHEMA_PATTERN).orElse(databasePattern);
+        validateSinkMultiple(multipleSink, sinkMultipleFormat, 
databasePattern, schemaPattern, tablePattern);
         JdbcOptions jdbcOptions = getJdbcOptions(config);
         TableSchema physicalSchema =
                 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         boolean appendMode = config.get(SINK_APPEND_MODE);
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = 
config.getOptional(INLONG_AUDIT).orElse(null);
+        SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy =
+                
helper.getOptions().getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(null);
         // Build the dirty data side-output
         final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(helper.getOptions());
         final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
@@ -205,8 +230,14 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 getJdbcDmlOptions(jdbcOptions, physicalSchema),
                 physicalSchema,
                 appendMode,
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                schemaPattern,
                 inlongMetric,
                 auditHostAndPorts,
+                schemaUpdateExceptionPolicy,
                 dirtyOptions,
                 dirtySink);
     }
@@ -228,6 +259,26 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 physicalSchema);
     }
 
+    private void validateSinkMultiple(boolean multipleSink, String 
sinkMultipleFormat,
+            String databasePattern, String schemaPattern, String tablePattern) 
{
+        Preconditions.checkNotNull(multipleSink, "The option 
'sink.multiple.enable' is not allowed null");
+        if (multipleSink) {
+            Preconditions.checkNotNull(databasePattern, "The option 
'sink.multiple.database-pattern'"
+                    + " is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            Preconditions.checkNotNull(schemaPattern, "The option 
'sink.multiple.schema-pattern'"
+                    + " is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            Preconditions.checkNotNull(tablePattern, "The option 
'sink.multiple.table-pattern' "
+                    + "is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            Preconditions.checkNotNull(sinkMultipleFormat, "The option 
'sink.multiple.format' "
+                    + "is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+            Set<String> supportFormats = 
DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
+            
Preconditions.checkArgument(supportFormats.contains(sinkMultipleFormat), 
String.format(
+                    "Unsupported value '%s' for '%s'. Supported values are 
%s.",
+                    sinkMultipleFormat, SINK_MULTIPLE_FORMAT.key(), 
supportFormats));
+        }
+    }
+
     private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
         String url = 
JdbcUrlUtils.replaceInvalidUrlProperty(readableConfig.get(URL));
         Optional<String> dialectImplOptional = 
readableConfig.getOptional(DIALECT_IMPL);
@@ -334,6 +385,12 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
         optionalOptions.add(MAX_RETRY_TIMEOUT);
         optionalOptions.add(DIALECT_IMPL);
+        optionalOptions.add(SINK_MULTIPLE_ENABLE);
+        optionalOptions.add(SINK_MULTIPLE_FORMAT);
+        optionalOptions.add(SINK_MULTIPLE_DATABASE_PATTERN);
+        optionalOptions.add(SINK_MULTIPLE_TABLE_PATTERN);
+        optionalOptions.add(SINK_MULTIPLE_SCHEMA_PATTERN);
+        optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
         optionalOptions.add(INLONG_METRIC);
         optionalOptions.add(INLONG_AUDIT);
         return optionalOptions;
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index e214e61d1..d3098ee11 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
@@ -52,9 +53,15 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
     private final TableSchema tableSchema;
     private final String dialectName;
 
+    private final boolean multipleSink;
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final boolean appendMode;
+    private final String sinkMultipleFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+    private final String schemaPattern;
+    private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
 
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
@@ -65,8 +72,14 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
             JdbcDmlOptions dmlOptions,
             TableSchema tableSchema,
             boolean appendMode,
+            boolean multipleSink,
+            String sinkMultipleFormat,
+            String databasePattern,
+            String tablePattern,
+            String schemaPattern,
             String inlongMetric,
             String auditHostAndPorts,
+            SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy,
             DirtyOptions dirtyOptions,
             @Nullable DirtySink<Object> dirtySink) {
         this.jdbcOptions = jdbcOptions;
@@ -75,15 +88,23 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
         this.tableSchema = tableSchema;
         this.dialectName = dmlOptions.getDialect().dialectName();
         this.appendMode = appendMode;
+        this.multipleSink = multipleSink;
+        this.sinkMultipleFormat = sinkMultipleFormat;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.schemaPattern = schemaPattern;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
     }
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        validatePrimaryKey(requestedMode);
+        if (!multipleSink) {
+            validatePrimaryKey(requestedMode);
+        }
         return ChangelogMode.newBuilder()
                 .addContainedKind(RowKind.INSERT)
                 .addContainedKind(RowKind.DELETE)
@@ -103,25 +124,37 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
         final TypeInformation<RowData> rowDataTypeInformation =
                 context.createTypeInformation(tableSchema.toRowDataType());
         final JdbcDynamicOutputFormatBuilder builder = new 
JdbcDynamicOutputFormatBuilder();
-
         builder.setAppendMode(appendMode);
         builder.setJdbcOptions(jdbcOptions);
         builder.setJdbcDmlOptions(dmlOptions);
         builder.setJdbcExecutionOptions(executionOptions);
-        builder.setRowDataTypeInfo(rowDataTypeInformation);
-        builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
         builder.setInLongMetric(inlongMetric);
         builder.setAuditHostAndPorts(auditHostAndPorts);
         builder.setDirtyOptions(dirtyOptions);
         builder.setDirtySink(dirtySink);
-        return SinkFunctionProvider.of(
-                new GenericJdbcSinkFunction<>(builder.build()), 
jdbcOptions.getParallelism());
+        if (multipleSink) {
+            builder.setSinkMultipleFormat(sinkMultipleFormat);
+            builder.setDatabasePattern(databasePattern);
+            builder.setTablePattern(tablePattern);
+            builder.setSchemaPattern(schemaPattern);
+            builder.setSchemaUpdatePolicy(schemaUpdateExceptionPolicy);
+            return SinkFunctionProvider.of(
+                    new GenericJdbcSinkFunction<>(builder.buildMulti()), 
jdbcOptions.getParallelism());
+        } else {
+            builder.setRowDataTypeInfo(rowDataTypeInformation);
+            builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
+            return SinkFunctionProvider.of(
+                    new GenericJdbcSinkFunction<>(builder.build()), 
jdbcOptions.getParallelism());
+        }
     }
 
     @Override
     public DynamicTableSink copy() {
         return new JdbcDynamicTableSink(jdbcOptions, executionOptions, 
dmlOptions,
-                tableSchema, appendMode, inlongMetric, auditHostAndPorts, 
dirtyOptions, dirtySink);
+                tableSchema, appendMode, multipleSink, sinkMultipleFormat,
+                databasePattern, tablePattern, schemaPattern,
+                inlongMetric, auditHostAndPorts,
+                schemaUpdateExceptionPolicy, dirtyOptions, dirtySink);
     }
 
     @Override

Reply via email to