This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3291aeb26 [INLONG-6819][Sort] Add multi table sink for PostgresSQL (#6820) 3291aeb26 is described below commit 3291aeb260cdbea5bdba86f9a044dc08fd157d2a Author: kuansix <490305...@qq.com> AuthorDate: Fri Dec 16 19:22:18 2022 +0800 [INLONG-6819][Sort] Add multi table sink for PostgresSQL (#6820) --- .../JdbcMultiBatchingComm.java} | 139 +---- .../internal/JdbcMultiBatchingOutputFormat.java | 639 +++++++++++++++++++++ .../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 52 ++ .../sort/jdbc/table/JdbcDynamicTableFactory.java | 61 +- .../sort/jdbc/table/JdbcDynamicTableSink.java | 47 +- 5 files changed, 797 insertions(+), 141 deletions(-) diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java similarity index 60% copy from inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java copy to inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java index 4a0bfa228..b22ac81b4 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.sort.jdbc.table; +package org.apache.inlong.sort.jdbc.internal; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor; @@ -30,51 +28,24 @@ import org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementE import org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor; import org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.inlong.sort.base.dirty.DirtyOptions; -import org.apache.inlong.sort.base.dirty.sink.DirtySink; -import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat; -import java.io.Serializable; import java.util.Arrays; import java.util.function.Function; import static org.apache.flink.table.data.RowData.createFieldGetter; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5 - * - * Builder for {@link JdbcBatchingOutputFormat} for Table/SQL. - * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey. + * Comm function for A JDBC multi-table outputFormat to get or create JDBC Executor */ -public class JdbcDynamicOutputFormatBuilder implements Serializable { - - private static final long serialVersionUID = 1L; - - private JdbcOptions jdbcOptions; - private JdbcExecutionOptions executionOptions; - private JdbcDmlOptions dmlOptions; - private boolean appendMode; - private TypeInformation<RowData> rowDataTypeInformation; - private DataType[] fieldDataTypes; - private String inlongMetric; - private String auditHostAndPorts; - private DirtyOptions dirtyOptions; - private DirtySink<Object> dirtySink; - - public JdbcDynamicOutputFormatBuilder() { +public class JdbcMultiBatchingComm { - } - - private static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor( + public static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor( JdbcDmlOptions opt, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo, @@ -95,6 +66,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity(); + return new TableBufferReducedStatementExecutor( createUpsertRowExecutor( dialect, @@ -109,7 +81,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { valueTransform); } - private static JdbcBatchStatementExecutor<RowData> createSimpleBufferedExecutor( + public static JdbcBatchStatementExecutor<RowData> createSimpleBufferedExecutor( RuntimeContext ctx, JdbcDialect dialect, String[] fieldNames, @@ -201,101 +173,4 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { return pkRow; } - public JdbcDynamicOutputFormatBuilder setAppendMode(boolean appendMode) { - this.appendMode = appendMode; - return this; - } - - public JdbcDynamicOutputFormatBuilder setJdbcOptions(JdbcOptions jdbcOptions) { - this.jdbcOptions = jdbcOptions; - return this; - } - - public JdbcDynamicOutputFormatBuilder setJdbcExecutionOptions( - JdbcExecutionOptions executionOptions) { - this.executionOptions = executionOptions; - return this; - } - - public JdbcDynamicOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions dmlOptions) { - this.dmlOptions = dmlOptions; - return this; - } - - public JdbcDynamicOutputFormatBuilder setRowDataTypeInfo( - TypeInformation<RowData> rowDataTypeInfo) { - this.rowDataTypeInformation = rowDataTypeInfo; - return this; - } - - public JdbcDynamicOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) { - this.fieldDataTypes = fieldDataTypes; - return this; - } - - public JdbcDynamicOutputFormatBuilder setInLongMetric(String inlongMetric) { - this.inlongMetric = inlongMetric; - return this; - } - - public JdbcDynamicOutputFormatBuilder setAuditHostAndPorts(String auditHostAndPorts) { - this.auditHostAndPorts = auditHostAndPorts; - return this; - } - - public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions dirtyOptions) { - this.dirtyOptions = dirtyOptions; - return this; - } - - public JdbcDynamicOutputFormatBuilder setDirtySink(DirtySink<Object> dirtySink) { - this.dirtySink = dirtySink; - return this; - } - - public JdbcBatchingOutputFormat<RowData, ?, ?> build() { - checkNotNull(jdbcOptions, "jdbc options can not be null"); - checkNotNull(dmlOptions, "jdbc dml options can not be null"); - checkNotNull(executionOptions, "jdbc execution options can not be null"); - - final LogicalType[] logicalTypes = - Arrays.stream(fieldDataTypes) - .map(DataType::getLogicalType) - .toArray(LogicalType[]::new); - if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0 && !appendMode) { - // upsert query - return new JdbcBatchingOutputFormat<>( - new SimpleJdbcConnectionProvider(jdbcOptions), - executionOptions, - ctx -> createBufferReduceExecutor( - dmlOptions, ctx, rowDataTypeInformation, logicalTypes), - JdbcBatchingOutputFormat.RecordExtractor.identity(), - inlongMetric, - auditHostAndPorts, - dirtyOptions, - dirtySink); - } else { - // append only query - final String sql = - dmlOptions - .getDialect() - .getInsertIntoStatement( - dmlOptions.getTableName(), dmlOptions.getFieldNames()); - return new JdbcBatchingOutputFormat<>( - new SimpleJdbcConnectionProvider(jdbcOptions), - executionOptions, - ctx -> createSimpleBufferedExecutor( - ctx, - dmlOptions.getDialect(), - dmlOptions.getFieldNames(), - logicalTypes, - sql, - rowDataTypeInformation), - JdbcBatchingOutputFormat.RecordExtractor.identity(), - inlongMetric, - auditHostAndPorts, - dirtyOptions, - dirtySink); - } - } -} \ No newline at end of file +} diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java new file mode 100644 index 000000000..069d41f31 --- /dev/null +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; +import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; + +/** + * A JDBC multi-table outputFormat that supports batching records before writing records to databases. + * Add an option `inlong.metric` to support metrics. + */ +public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> + extends + AbstractJdbcOutputFormat<In> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(JdbcMultiBatchingOutputFormat.class); + private final JdbcExecutionOptions executionOptions; + private final String inlongMetric; + private final String auditHostAndPorts; + private transient int batchCount = 0; + private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient RuntimeContext runtimeContext; + private JdbcDmlOptions dmlOptions; + private JdbcOptions jdbcOptions; + private boolean appendMode; + private transient Map<String, JdbcExec> jdbcExecMap = new HashMap<>(); + private transient Map<String, SimpleJdbcConnectionProvider> connectionExecProviderMap = new HashMap<>(); + private transient Map<String, RowType> rowTypeMap = new HashMap<>(); + private transient Map<String, List<String>> pkNameMap = new HashMap<>(); + private transient Map<String, List<GenericRowData>> recordsMap = new HashMap<>(); + private transient Map<String, Exception> tableExceptionMap = new HashMap<>(); + private transient Boolean isIgnoreTableException; + + private transient ListState<MetricState> metricStateListState; + private final String sinkMultipleFormat; + private final String databasePattern; + private final String tablePattern; + private final String schemaPattern; + private transient MetricState metricState; + private SinkMetricData sinkMetricData; + private Long dataSize = 0L; + private Long rowSize = 0L; + private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy; + + public JdbcMultiBatchingOutputFormat( + @Nonnull JdbcConnectionProvider connectionProvider, + @Nonnull JdbcExecutionOptions executionOptions, + @Nonnull JdbcDmlOptions dmlOptions, + @Nonnull boolean appendMode, + @Nonnull JdbcOptions jdbcOptions, + String sinkMultipleFormat, + String databasePattern, + String tablePattern, + String schemaPattern, + String inlongMetric, + String auditHostAndPorts, + SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) { + super(connectionProvider); + this.executionOptions = checkNotNull(executionOptions); + this.dmlOptions = dmlOptions; + this.appendMode = appendMode; + this.jdbcOptions = jdbcOptions; + this.sinkMultipleFormat = sinkMultipleFormat; + this.databasePattern = databasePattern; + this.tablePattern = tablePattern; + this.schemaPattern = schemaPattern; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy; + } + + /** + * Connects to the target database and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + this.runtimeContext = getRuntimeContext(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withRegisterMetric(MetricOption.RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup()); + } + jdbcExecMap = new HashMap<>(); + connectionExecProviderMap = new HashMap<>(); + pkNameMap = new HashMap<>(); + rowTypeMap = new HashMap<>(); + recordsMap = new HashMap<>(); + tableExceptionMap = new HashMap<>(); + isIgnoreTableException = schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE) + || schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL); + if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (JdbcMultiBatchingOutputFormat.this) { + if (!closed) { + try { + flush(); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); + } + resetStateAfterFlush(); + } catch (Exception e) { + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); + } + resetStateAfterFlush(); + } + } + } + }, + executionOptions.getBatchIntervalMs(), + executionOptions.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + } + } + + /** + * get or create StatementExecutor for one table. + * + * @param tableIdentifier The table identifier for which to get statementExecutor. + */ + private JdbcExec getOrCreateStatementExecutor( + String tableIdentifier) throws IOException { + if (StringUtils.isBlank(tableIdentifier)) { + return null; + } + JdbcExec jdbcExec = jdbcExecMap.get(tableIdentifier); + if (null != jdbcExec) { + return jdbcExec; + } + RowType rowType = rowTypeMap.get(tableIdentifier); + LogicalType[] logicalTypes = rowType.getFields().stream() + .map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + String[] filedNames = rowType.getFields().stream() + .map(RowType.RowField::getName) + .toArray(String[]::new); + TypeInformation<RowData> rowDataTypeInfo = InternalTypeInfo.of(rowType); + List<String> pkNameList = null; + if (null != pkNameMap.get(tableIdentifier)) { + pkNameList = pkNameMap.get(tableIdentifier); + } + StatementExecutorFactory<JdbcExec> statementExecutorFactory = null; + if (CollectionUtils.isNotEmpty(pkNameList) && !appendMode) { + // upsert query + JdbcDmlOptions createDmlOptions = JdbcDmlOptions.builder() + .withTableName(getTbNameFromIdentifier(tableIdentifier)) + .withDialect(jdbcOptions.getDialect()) + .withFieldNames(filedNames) + .withKeyFields(pkNameList.toArray(new String[pkNameList.size()])) + .build(); + statementExecutorFactory = ctx -> (JdbcExec) JdbcMultiBatchingComm.createBufferReduceExecutor( + createDmlOptions, ctx, rowDataTypeInfo, logicalTypes); + + } else { + // append only query + final String sql = dmlOptions + .getDialect() + .getInsertIntoStatement(getTbNameFromIdentifier(tableIdentifier), filedNames); + statementExecutorFactory = ctx -> (JdbcExec) JdbcMultiBatchingComm.createSimpleBufferedExecutor( + ctx, + dmlOptions.getDialect(), + filedNames, + logicalTypes, + sql, + rowDataTypeInfo); + } + + jdbcExec = statementExecutorFactory.apply(getRuntimeContext()); + try { + JdbcOptions jdbcExecOptions = + JdbcOptions.builder() + .setDBUrl(jdbcOptions.getDbURL() + "/" + getTDbNameFromIdentifier(tableIdentifier)) + .setTableName(getTbNameFromIdentifier(tableIdentifier)) + .setDialect(jdbcOptions.getDialect()) + .setParallelism(jdbcOptions.getParallelism()) + .setConnectionCheckTimeoutSeconds(jdbcOptions.getConnectionCheckTimeoutSeconds()) + .setDriverName(jdbcOptions.getDriverName()) + .setUsername(jdbcOptions.getUsername().orElse("")) + .setPassword(jdbcOptions.getPassword().orElse("")) + .build(); + SimpleJdbcConnectionProvider tableConnectionProvider = new SimpleJdbcConnectionProvider(jdbcExecOptions); + try { + tableConnectionProvider.getOrEstablishConnection(); + } catch (Exception e) { + LOG.error("unable to open JDBC writer, tableIdentifier:{} err:", tableIdentifier, e); + return null; + } + connectionExecProviderMap.put(tableIdentifier, tableConnectionProvider); + jdbcExec.prepareStatements(tableConnectionProvider.getConnection()); + } catch (Exception e) { + return null; + } + jdbcExecMap.put(tableIdentifier, jdbcExec); + return jdbcExec; + } + + /** + * Get table name From tableIdentifier + * tableIdentifier maybe: ${dbName}.${tbName} or ${dbName}.${schemaName}.${tbName} + * + * @param tableIdentifier The table identifier for which to get table name. + */ + private String getTbNameFromIdentifier(String tableIdentifier) { + String[] fileArray = tableIdentifier.split("\\."); + if (2 == fileArray.length) { + return fileArray[1]; + } + if (3 == fileArray.length) { + return fileArray[1] + "." + fileArray[2]; + } + return null; + } + + private String getTDbNameFromIdentifier(String tableIdentifier) { + String[] fileArray = tableIdentifier.split("\\."); + return fileArray[0]; + } + + private void checkFlushException() { + if (schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP) + && !tableExceptionMap.isEmpty()) { + String tableErr = "Writing table get failed, tables are:"; + for (Map.Entry<String, Exception> entry : tableExceptionMap.entrySet()) { + LOG.error("Writing table:{} get err:{}", entry.getKey(), entry.getValue().getMessage()); + tableErr = tableErr + entry.getKey() + ","; + } + throw new RuntimeException(tableErr.substring(0, tableErr.length() - 1)); + } + } + + /** + * Extract and write record to recordsMap(buffer) + * + * @param row The record to write. + */ + @Override + public final synchronized void writeRecord(In row) throws IOException { + checkFlushException(); + JsonDynamicSchemaFormat jsonDynamicSchemaFormat = + (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); + if (row instanceof RowData) { + RowData rowData = (RowData) row; + JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0)); + String tableIdentifier = null; + try { + if (StringUtils.isBlank(schemaPattern)) { + tableIdentifier = StringUtils.join( + jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".", + jsonDynamicSchemaFormat.parse(rootNode, tablePattern)); + } else { + tableIdentifier = StringUtils.join( + jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".", + jsonDynamicSchemaFormat.parse(rootNode, schemaPattern), ".", + jsonDynamicSchemaFormat.parse(rootNode, tablePattern)); + } + } catch (Exception e) { + LOG.info("Cal tableIdentifier get Exception:", e); + return; + } + rowSize++; + dataSize = dataSize + rootNode.toString().getBytes(StandardCharsets.UTF_8).length; + + GenericRowData record = null; + try { + RowType rowType = jsonDynamicSchemaFormat.extractSchema(rootNode); + if (rowType != null) { + if (null != rowTypeMap.get(tableIdentifier)) { + if (!rowType.equals(rowTypeMap.get(tableIdentifier))) { + attemptFlush(); + rowTypeMap.put(tableIdentifier, rowType); + updateOneExecutor(true, tableIdentifier); + } + } else { + rowTypeMap.put(tableIdentifier, rowType); + } + } + List<String> pkNameList = jsonDynamicSchemaFormat.extractPrimaryKeyNames(rootNode); + pkNameMap.put(tableIdentifier, pkNameList); + JsonNode physicalData = jsonDynamicSchemaFormat.getPhysicalData(rootNode); + List<Map<String, String>> physicalDataList = jsonDynamicSchemaFormat.jsonNode2Map(physicalData); + record = generateRecord(rowType, physicalDataList.get(0)); + List<RowKind> rowKinds = jsonDynamicSchemaFormat + .opType2RowKind(jsonDynamicSchemaFormat.getOpType(rootNode)); + record.setRowKind(rowKinds.get(rowKinds.size() - 1)); + } catch (Exception e) { + LOG.warn("Extract schema failed", e); + return; + } + try { + recordsMap.computeIfAbsent(tableIdentifier, k -> new ArrayList<>()) + .add(record); + batchCount++; + if (executionOptions.getBatchSize() > 0 + && batchCount >= executionOptions.getBatchSize()) { + flush(); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); + } + resetStateAfterFlush(); + } + } catch (Exception e) { + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); + } + resetStateAfterFlush(); + throw new IOException("Writing records to JDBC failed.", e); + } + } + } + + /** + * Convert fieldMap(data) to GenericRowData with rowType(schema) + */ + protected GenericRowData generateRecord(RowType rowType, Map<String, String> fieldMap) { + String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + int arity = fieldNames.length; + GenericRowData record = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + String fieldValue = fieldMap.get(fieldName); + if (StringUtils.isBlank(fieldValue)) { + record.setField(i, null); + } else { + switch (rowType.getFields().get(i).getType().getTypeRoot()) { + case BIGINT: + record.setField(i, Long.valueOf(fieldValue)); + break; + case BOOLEAN: + record.setField(i, Boolean.valueOf(fieldValue)); + break; + case DOUBLE: + case DECIMAL: + record.setField(i, Double.valueOf(fieldValue)); + break; + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case INTERVAL_DAY_TIME: + TimestampData timestampData = TimestampData.fromEpochMillis(Long.valueOf(fieldValue)); + record.setField(i, timestampData); + break; + case BINARY: + record.setField(i, Arrays.toString(fieldValue.getBytes(StandardCharsets.UTF_8))); + break; + default: + record.setField(i, StringData.fromString(fieldValue)); + } + } + } + return record; + } + + private void resetStateAfterFlush() { + dataSize = 0L; + rowSize = 0L; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + + @Override + public synchronized void flush() throws IOException { + checkFlushException(); + attemptFlush(); + batchCount = 0; + } + + /** + * Write all recorde from recordsMap to db + * + * First batch writing. + * If batch-writing occur exception, then rewrite one-by-one retry-times set by user. + */ + protected void attemptFlush() throws IOException { + for (Map.Entry<String, List<GenericRowData>> entry : recordsMap.entrySet()) { + String tableIdentifier = entry.getKey(); + boolean isIgnoreTableIdentifierException = isIgnoreTableException + && (null != tableExceptionMap.get(tableIdentifier)); + if (isIgnoreTableIdentifierException) { + continue; + } + List<GenericRowData> tableIdRecordList = entry.getValue(); + if (CollectionUtils.isEmpty(tableIdRecordList)) { + continue; + } + JdbcExec jdbcStatementExecutor = null; + Boolean flushFlag = false; + Exception tableException = null; + try { + jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier); + for (GenericRowData record : tableIdRecordList) { + jdbcStatementExecutor.addToBatch((JdbcIn) record); + } + jdbcStatementExecutor.executeBatch(); + flushFlag = true; + } catch (Exception e) { + tableException = e; + LOG.warn("Flush all data for tableIdentifier:{} get err:", tableIdentifier, e); + getAndSetPkFromErrMsg(tableIdentifier, e.getMessage()); + updateOneExecutor(true, tableIdentifier); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "unable to flush; interrupted while doing another attempt", e); + } + } + + if (!flushFlag) { + for (GenericRowData record : tableIdRecordList) { + for (int retryTimes = 1; retryTimes <= executionOptions.getMaxRetries(); retryTimes++) { + try { + jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier); + jdbcStatementExecutor.addToBatch((JdbcIn) record); + jdbcStatementExecutor.executeBatch(); + flushFlag = true; + break; + } catch (Exception e) { + LOG.error("Flush one record tableIdentifier:{} ,retryTimes:{} get err:", + tableIdentifier, retryTimes, e); + getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier); + tableException = e; + updateOneExecutor(true, tableIdentifier); + try { + Thread.sleep(1000 * retryTimes); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "unable to flush; interrupted while doing another attempt", e); + } + } + } + if (!flushFlag && null != tableException) { + LOG.info("Put tableIdentifier:{} exception:{}", + tableIdentifier, tableException.getMessage()); + tableExceptionMap.put(tableIdentifier, tableException); + if (isIgnoreTableException) { + LOG.info("Stop write table:{} because occur exception", + tableIdentifier); + continue; + } + } + } + } + tableIdRecordList.clear(); + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + */ + @Override + public synchronized void close() { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to JDBC failed.", e); + throw new RuntimeException("Writing records to JDBC failed.", e); + } + } + + try { + if (null != jdbcExecMap) { + jdbcExecMap.forEach((tableIdentifier, jdbcExec) -> { + try { + jdbcExec.closeStatements(); + } catch (SQLException e) { + LOG.error("jdbcExec executeBatch get err", e); + } + }); + } + } catch (Exception e) { + LOG.warn("Close JDBC writer failed.", e); + } + } + super.close(); + checkFlushException(); + } + + public boolean getAndSetPkFromErrMsg(String errMsg, String tableIdentifier) { + String rgex = "Detail: Key \\((.*?)\\)=\\("; + Pattern pattern = Pattern.compile(rgex); + Matcher m = pattern.matcher(errMsg); + List<String> pkNameList = new ArrayList<>(); + if (m.find()) { + String[] pkNameArray = m.group(1).split(","); + for (String pkName : pkNameArray) { + pkNameList.add(pkName.trim()); + } + pkNameMap.put(tableIdentifier, pkNameList); + return true; + } + return false; + } + + public void updateOneExecutor(boolean reconnect, String tableIdentifier) { + try { + SimpleJdbcConnectionProvider tableConnectionProvider = connectionExecProviderMap.get(tableIdentifier); + if (reconnect || null == tableConnectionProvider + || !tableConnectionProvider.isConnectionValid()) { + JdbcExec tableJdbcExec = jdbcExecMap.get(tableIdentifier); + if (null != tableJdbcExec) { + tableJdbcExec.closeStatements(); + jdbcExecMap.remove(tableIdentifier); + getOrCreateStatementExecutor(tableIdentifier); + } + } + } catch (SQLException | IOException e) { + LOG.error("jdbcExec updateOneExecutor get err", e); + } + } + + /** + * A factory for creating {@link JdbcBatchStatementExecutor} instance. + * + * @param <T> The type of instance. + */ + public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>> + extends + Function<RuntimeContext, T>, + Serializable { + + } + +} diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java index 4a0bfa228..531864cd4 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java @@ -37,9 +37,11 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.inlong.sort.jdbc.internal.JdbcMultiBatchingOutputFormat; import java.io.Serializable; import java.util.Arrays; @@ -67,6 +69,11 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { private DataType[] fieldDataTypes; private String inlongMetric; private String auditHostAndPorts; + private String sinkMultipleFormat; + private String databasePattern; + private String tablePattern; + private String schemaPattern; + private SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy; private DirtyOptions dirtyOptions; private DirtySink<Object> dirtySink; @@ -243,6 +250,32 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { return this; } + public JdbcDynamicOutputFormatBuilder setSinkMultipleFormat(String sinkMultipleFormat) { + this.sinkMultipleFormat = sinkMultipleFormat; + return this; + } + + public JdbcDynamicOutputFormatBuilder setDatabasePattern(String databasePattern) { + this.databasePattern = databasePattern; + return this; + } + + public JdbcDynamicOutputFormatBuilder setTablePattern(String tablePattern) { + this.tablePattern = tablePattern; + return this; + } + + public JdbcDynamicOutputFormatBuilder setSchemaPattern(String schemaPattern) { + this.schemaPattern = schemaPattern; + return this; + } + + public JdbcDynamicOutputFormatBuilder setSchemaUpdatePolicy( + SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) { + this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy; + return this; + } + public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions dirtyOptions) { this.dirtyOptions = dirtyOptions; return this; @@ -298,4 +331,23 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { dirtySink); } } + + public JdbcMultiBatchingOutputFormat<RowData, ?, ?> buildMulti() { + checkNotNull(jdbcOptions, "jdbc options can not be null"); + checkNotNull(dmlOptions, "jdbc dml options can not be null"); + checkNotNull(executionOptions, "jdbc execution options can not be null"); + return new JdbcMultiBatchingOutputFormat<>( + new SimpleJdbcConnectionProvider(jdbcOptions), + executionOptions, + dmlOptions, + appendMode, + jdbcOptions, + sinkMultipleFormat, + databasePattern, + tablePattern, + schemaPattern, + inlongMetric, + auditHostAndPorts, + schemaUpdateExceptionPolicy); + } } \ No newline at end of file diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java index 533224f00..e12128ec8 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java @@ -41,16 +41,25 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; +import org.apache.inlong.sort.base.util.JdbcUrlUtils; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; -import org.apache.inlong.sort.base.util.JdbcUrlUtils; -import static org.apache.flink.util.Preconditions.checkState; import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; /** * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5 @@ -182,6 +191,14 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam .defaultValue(false) .withDescription("Whether to support sink update/delete data without primaryKey."); + public static final ConfigOption<String> SINK_MULTIPLE_SCHEMA_PATTERN = + ConfigOptions.key("sink.multiple.schema-pattern") + .stringType() + .noDefaultValue() + .withDescription("The option 'sink.multiple.schema-pattern' " + + "is used extract table name from the raw binary data, " + + "this is only used in the multiple sink writing scenario."); + @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = @@ -190,12 +207,20 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam helper.validateExcept(DIRTY_PREFIX); validateConfigOptions(config); + boolean multipleSink = config.getOptional(SINK_MULTIPLE_ENABLE).orElse(false); + String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null); + String databasePattern = helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null); + String tablePattern = helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null); + String schemaPattern = helper.getOptions().getOptional(SINK_MULTIPLE_SCHEMA_PATTERN).orElse(databasePattern); + validateSinkMultiple(multipleSink, sinkMultipleFormat, databasePattern, schemaPattern, tablePattern); JdbcOptions jdbcOptions = getJdbcOptions(config); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); boolean appendMode = config.get(SINK_APPEND_MODE); String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null); + SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy = + helper.getOptions().getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(null); // Build the dirty data side-output final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(helper.getOptions()); final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); @@ -205,8 +230,14 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam getJdbcDmlOptions(jdbcOptions, physicalSchema), physicalSchema, appendMode, + multipleSink, + sinkMultipleFormat, + databasePattern, + tablePattern, + schemaPattern, inlongMetric, auditHostAndPorts, + schemaUpdateExceptionPolicy, dirtyOptions, dirtySink); } @@ -228,6 +259,26 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam physicalSchema); } + private void validateSinkMultiple(boolean multipleSink, String sinkMultipleFormat, + String databasePattern, String schemaPattern, String tablePattern) { + Preconditions.checkNotNull(multipleSink, "The option 'sink.multiple.enable' is not allowed null"); + if (multipleSink) { + Preconditions.checkNotNull(databasePattern, "The option 'sink.multiple.database-pattern'" + + " is not allowed blank when the option 'sink.multiple.enable' is 'true'"); + Preconditions.checkNotNull(schemaPattern, "The option 'sink.multiple.schema-pattern'" + + " is not allowed blank when the option 'sink.multiple.enable' is 'true'"); + Preconditions.checkNotNull(tablePattern, "The option 'sink.multiple.table-pattern' " + + "is not allowed blank when the option 'sink.multiple.enable' is 'true'"); + Preconditions.checkNotNull(sinkMultipleFormat, "The option 'sink.multiple.format' " + + "is not allowed blank when the option 'sink.multiple.enable' is 'true'"); + DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); + Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet(); + Preconditions.checkArgument(supportFormats.contains(sinkMultipleFormat), String.format( + "Unsupported value '%s' for '%s'. Supported values are %s.", + sinkMultipleFormat, SINK_MULTIPLE_FORMAT.key(), supportFormats)); + } + } + private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) { String url = JdbcUrlUtils.replaceInvalidUrlProperty(readableConfig.get(URL)); Optional<String> dialectImplOptional = readableConfig.getOptional(DIALECT_IMPL); @@ -334,6 +385,12 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam optionalOptions.add(FactoryUtil.SINK_PARALLELISM); optionalOptions.add(MAX_RETRY_TIMEOUT); optionalOptions.add(DIALECT_IMPL); + optionalOptions.add(SINK_MULTIPLE_ENABLE); + optionalOptions.add(SINK_MULTIPLE_FORMAT); + optionalOptions.add(SINK_MULTIPLE_DATABASE_PATTERN); + optionalOptions.add(SINK_MULTIPLE_TABLE_PATTERN); + optionalOptions.add(SINK_MULTIPLE_SCHEMA_PATTERN); + optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); optionalOptions.add(INLONG_METRIC); optionalOptions.add(INLONG_AUDIT); return optionalOptions; diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java index e214e61d1..d3098ee11 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java @@ -28,6 +28,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction; @@ -52,9 +53,15 @@ public class JdbcDynamicTableSink implements DynamicTableSink { private final TableSchema tableSchema; private final String dialectName; + private final boolean multipleSink; private final String inlongMetric; private final String auditHostAndPorts; private final boolean appendMode; + private final String sinkMultipleFormat; + private final String databasePattern; + private final String tablePattern; + private final String schemaPattern; + private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy; private final DirtyOptions dirtyOptions; private @Nullable final DirtySink<Object> dirtySink; @@ -65,8 +72,14 @@ public class JdbcDynamicTableSink implements DynamicTableSink { JdbcDmlOptions dmlOptions, TableSchema tableSchema, boolean appendMode, + boolean multipleSink, + String sinkMultipleFormat, + String databasePattern, + String tablePattern, + String schemaPattern, String inlongMetric, String auditHostAndPorts, + SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy, DirtyOptions dirtyOptions, @Nullable DirtySink<Object> dirtySink) { this.jdbcOptions = jdbcOptions; @@ -75,15 +88,23 @@ public class JdbcDynamicTableSink implements DynamicTableSink { this.tableSchema = tableSchema; this.dialectName = dmlOptions.getDialect().dialectName(); this.appendMode = appendMode; + this.multipleSink = multipleSink; + this.sinkMultipleFormat = sinkMultipleFormat; + this.databasePattern = databasePattern; + this.tablePattern = tablePattern; + this.schemaPattern = schemaPattern; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy; this.dirtyOptions = dirtyOptions; this.dirtySink = dirtySink; } @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { - validatePrimaryKey(requestedMode); + if (!multipleSink) { + validatePrimaryKey(requestedMode); + } return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.DELETE) @@ -103,25 +124,37 @@ public class JdbcDynamicTableSink implements DynamicTableSink { final TypeInformation<RowData> rowDataTypeInformation = context.createTypeInformation(tableSchema.toRowDataType()); final JdbcDynamicOutputFormatBuilder builder = new JdbcDynamicOutputFormatBuilder(); - builder.setAppendMode(appendMode); builder.setJdbcOptions(jdbcOptions); builder.setJdbcDmlOptions(dmlOptions); builder.setJdbcExecutionOptions(executionOptions); - builder.setRowDataTypeInfo(rowDataTypeInformation); - builder.setFieldDataTypes(tableSchema.getFieldDataTypes()); builder.setInLongMetric(inlongMetric); builder.setAuditHostAndPorts(auditHostAndPorts); builder.setDirtyOptions(dirtyOptions); builder.setDirtySink(dirtySink); - return SinkFunctionProvider.of( - new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism()); + if (multipleSink) { + builder.setSinkMultipleFormat(sinkMultipleFormat); + builder.setDatabasePattern(databasePattern); + builder.setTablePattern(tablePattern); + builder.setSchemaPattern(schemaPattern); + builder.setSchemaUpdatePolicy(schemaUpdateExceptionPolicy); + return SinkFunctionProvider.of( + new GenericJdbcSinkFunction<>(builder.buildMulti()), jdbcOptions.getParallelism()); + } else { + builder.setRowDataTypeInfo(rowDataTypeInformation); + builder.setFieldDataTypes(tableSchema.getFieldDataTypes()); + return SinkFunctionProvider.of( + new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism()); + } } @Override public DynamicTableSink copy() { return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, - tableSchema, appendMode, inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink); + tableSchema, appendMode, multipleSink, sinkMultipleFormat, + databasePattern, tablePattern, schemaPattern, + inlongMetric, auditHostAndPorts, + schemaUpdateExceptionPolicy, dirtyOptions, dirtySink); } @Override