This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f2110a4316 [INLONG-10055][Sort] Support flink-connector-jdbc based on flink 1.18 (#10073) f2110a4316 is described below commit f2110a43167cf12646956730a7997b8099b88d9d Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Fri Apr 26 18:52:12 2024 +0800 [INLONG-10055][Sort] Support flink-connector-jdbc based on flink 1.18 (#10073) --- .../src/main/assemblies/sort-connectors-v1.18.xml | 9 + .../sort-flink-v1.18/sort-connectors/jdbc/pom.xml | 117 +++++ .../jdbc/internal/GenericJdbcSinkFunction.java | 82 ++++ .../sort/jdbc/internal/JdbcOutputFormat.java | 499 +++++++++++++++++++++ .../jdbc/internal/TableJdbcUpsertOutputFormat.java | 225 ++++++++++ .../sort/jdbc/table/JdbcDynamicTableFactory.java | 357 +++++++++++++++ .../sort/jdbc/table/JdbcDynamicTableSink.java | 148 ++++++ .../sort/jdbc/table/JdbcOutputFormatBuilder.java | 283 ++++++++++++ .../org.apache.flink.table.factories.Factory | 16 + .../sort-flink-v1.18/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE | 9 + 11 files changed, 1746 insertions(+) diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml index 61465915fa..04e1fce16b 100644 --- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml +++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml @@ -36,5 +36,14 @@ <fileMode>0644</fileMode> </fileSet> + <fileSet> + <directory>../inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/target</directory> + <outputDirectory>inlong-sort/connectors</outputDirectory> + <includes> + <include>sort-connector-jdbc-v1.18-${project.version}.jar</include> + </includes> + <fileMode>0644</fileMode> + </fileSet> + </fileSets> </assembly> diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml new file mode 100644 index 0000000000..dcd7e24afc --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connectors-v1.18</artifactId> + <version>1.13.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-connector-jdbc-v1.18</artifactId> + <packaging>jar</packaging> + <name>Apache InLong - Sort-connector-jdbc</name> + + <properties> + <flink.connector.jdbc.version>3.1.2-1.18</flink.connector.jdbc.version> + <inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-jdbc</artifactId> + <version>${flink.connector.jdbc.version}</version> + </dependency> + + <!--for mysql--> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <scope>compile</scope> + </dependency> + + <!-- Postgres --> + + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + </dependency> + + <!-- Oracle --> + <dependency> + <groupId>com.oracle.database.jdbc</groupId> + <artifactId>ojdbc8</artifactId> + </dependency> + + <!-- SQL Server --> + <dependency> + <groupId>com.microsoft.sqlserver</groupId> + <artifactId>mssql-jdbc</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + <filter> + <artifact>org.apache.inlong:sort-connector-*</artifact> + <includes> + <include>org/apache/inlong/**</include> + <include>META-INF/services/org.apache.flink.table.factories.Factory</include> + </includes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java new file mode 100644 index 0000000000..39c6e45ec0 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; + +/** A generic SinkFunction for JDBC. + * Modify from {@link org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction} + * */ +@Internal +public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T> + implements + CheckpointedFunction, + InputTypeConfigurable { + + private final JdbcOutputFormat<T, ?, ?> outputFormat; + + public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat<T, ?, ?> outputFormat) { + this.outputFormat = Preconditions.checkNotNull(outputFormat); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext ctx = getRuntimeContext(); + outputFormat.setRuntimeContext(ctx); + outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + @Override + public void invoke(T value, Context context) throws IOException { + outputFormat.writeRecord(value); + } + + @Override + public void initializeState(FunctionInitializationContext context) { + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + } + + @Override + public void close() { + outputFormat.close(); + } + + @Override + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + outputFormat.setInputType(type, executionConfig); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java new file mode 100644 index 0000000000..835ed77e00 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.function.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.Flushable; +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A JDBC outputFormat that supports batching records before writing records to database. + * Modify from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat} + * */ +@Internal +public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> + extends + RichOutputFormat<In> + implements + Flushable, + InputTypeConfigurable { + + protected final JdbcConnectionProvider connectionProvider; + @Nullable + private TypeSerializer<In> serializer; + + // audit + private final String inlongMetric; + private final String auditHostAndPorts; + private final String auditKeys; + private SinkMetricData sinkMetricData; + private Long rowCount = 0L; + private Long dataSize = 0L; + + @Override + @SuppressWarnings("unchecked") + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (executionConfig.isObjectReuseEnabled()) { + this.serializer = (TypeSerializer<In>) type.createSerializer(executionConfig); + } + } + + /** + * An interface to extract a value from given argument. + * + * @param <F> The type of given argument + * @param <T> The type of the return value + */ + public interface RecordExtractor<F, T> extends Function<F, T>, Serializable { + + static <T> JdbcOutputFormat.RecordExtractor<T, T> identity() { + return x -> x; + } + } + + /** + * A factory for creating {@link JdbcBatchStatementExecutor} instance. + * + * @param <T> The type of instance. + */ + public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>> + extends + SerializableFunction<RuntimeContext, T> { + } + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class); + + private final JdbcExecutionOptions executionOptions; + private final JdbcOutputFormat.StatementExecutorFactory<JdbcExec> statementExecutorFactory; + private final JdbcOutputFormat.RecordExtractor<In, JdbcIn> jdbcRecordExtractor; + + private transient JdbcExec jdbcStatementExecutor; + private transient int batchCount = 0; + private transient volatile boolean closed = false; + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public JdbcOutputFormat( + @Nonnull JdbcConnectionProvider connectionProvider, + @Nonnull JdbcExecutionOptions executionOptions, + @Nonnull JdbcOutputFormat.StatementExecutorFactory<JdbcExec> statementExecutorFactory, + @Nonnull JdbcOutputFormat.RecordExtractor<In, JdbcIn> recordExtractor, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { + this.connectionProvider = checkNotNull(connectionProvider); + this.executionOptions = checkNotNull(executionOptions); + this.statementExecutorFactory = checkNotNull(statementExecutorFactory); + this.jdbcRecordExtractor = checkNotNull(recordExtractor); + // audit + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; + } + + @Override + public void configure(Configuration parameters) { + } + + /** + * Connects to the target database and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + connectionProvider.getOrEstablishConnection(); + } catch (Exception e) { + throw new IOException("unable to open JDBC writer", e); + } + // audit + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + + jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory); + if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (JdbcOutputFormat.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + executionOptions.getBatchIntervalMs(), + executionOptions.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + } + } + + private JdbcExec createAndOpenStatementExecutor( + JdbcOutputFormat.StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException { + JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext()); + try { + exec.prepareStatements(connectionProvider.getConnection()); + } catch (SQLException e) { + throw new IOException("unable to open JDBC writer", e); + } + return exec; + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to JDBC failed.", flushException); + } + } + + @Override + public final synchronized void writeRecord(In record) throws IOException { + checkFlushException(); + updateMetric(record); + try { + In recordCopy = copyIfNecessary(record); + addToBatch(record, jdbcRecordExtractor.apply(recordCopy)); + batchCount++; + if (executionOptions.getBatchSize() > 0 + && batchCount >= executionOptions.getBatchSize()) { + flush(); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowCount, dataSize); + } + } + } catch (Exception e) { + throw new IOException("Writing records to JDBC failed.", e); + } + } + + private void updateMetric(In record) { + rowCount++; + dataSize += CalculateObjectSizeUtils.getDataSize(record); + } + + private In copyIfNecessary(In record) { + return serializer == null ? record : serializer.copy(record); + } + + protected void addToBatch(In original, JdbcIn extracted) throws SQLException { + jdbcStatementExecutor.addToBatch(extracted); + } + + @Override + public synchronized void flush() throws IOException { + checkFlushException(); + + for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { + try { + attemptFlush(); + batchCount = 0; + break; + } catch (SQLException e) { + LOG.error("JDBC executeBatch error, retry times = {}", i, e); + if (i >= executionOptions.getMaxRetries()) { + throw new IOException(e); + } + try { + if (!connectionProvider.isConnectionValid()) { + updateExecutor(true); + } + } catch (Exception exception) { + LOG.error( + "JDBC connection is not valid, and reestablish connection failed.", + exception); + throw new IOException("Reestablish JDBC connection failed", exception); + } + try { + Thread.sleep(1000 * i); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "unable to flush; interrupted while doing another attempt", e); + } + } + } + // audit, flush metrics data + if (sinkMetricData != null) { + sinkMetricData.flushAuditData(); + } + } + + protected void attemptFlush() throws SQLException { + jdbcStatementExecutor.executeBatch(); + } + + /** Executes prepared statement and closes all resources of this instance. */ + @Override + public synchronized void close() { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to JDBC failed.", e); + throw new RuntimeException("Writing records to JDBC failed.", e); + } + } + + // audit, flush metrics data + if (sinkMetricData != null) { + sinkMetricData.flushAuditData(); + } + + try { + if (jdbcStatementExecutor != null) { + jdbcStatementExecutor.closeStatements(); + } + } catch (SQLException e) { + LOG.warn("Close JDBC writer failed.", e); + } + } + connectionProvider.closeConnection(); + checkFlushException(); + } + + public static JdbcOutputFormat.Builder builder() { + return new JdbcOutputFormat.Builder(); + } + + /** Builder for a {@link JdbcOutputFormat}. */ + public static class Builder { + + private InternalJdbcConnectionOptions options; + private String[] fieldNames; + private String[] keyFields; + private int[] fieldTypes; + + // audit + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + + private JdbcExecutionOptions.Builder executionOptionsBuilder = + JdbcExecutionOptions.builder(); + + /** required, jdbc options. */ + public JdbcOutputFormat.Builder setOptions(InternalJdbcConnectionOptions options) { + this.options = options; + return this; + } + + /** required, field names of this jdbc sink. */ + public JdbcOutputFormat.Builder setFieldNames(String[] fieldNames) { + this.fieldNames = fieldNames; + return this; + } + + /** required, upsert unique keys. */ + public JdbcOutputFormat.Builder setKeyFields(String[] keyFields) { + this.keyFields = keyFields; + return this; + } + + /** required, field types of this jdbc sink. */ + public JdbcOutputFormat.Builder setFieldTypes(int[] fieldTypes) { + this.fieldTypes = fieldTypes; + return this; + } + + /** + * optional, flush max size (includes all append, upsert and delete records), over this + * number of records, will flush data. + */ + public JdbcOutputFormat.Builder setFlushMaxSize(int flushMaxSize) { + executionOptionsBuilder.withBatchSize(flushMaxSize); + return this; + } + + /** optional, flush interval mills, over this time, asynchronous threads will flush data. */ + public JdbcOutputFormat.Builder setFlushIntervalMills(long flushIntervalMills) { + executionOptionsBuilder.withBatchIntervalMs(flushIntervalMills); + return this; + } + + /** optional, max retry times for jdbc connector. */ + public JdbcOutputFormat.Builder setMaxRetryTimes(int maxRetryTimes) { + executionOptionsBuilder.withMaxRetries(maxRetryTimes); + return this; + } + + public JdbcOutputFormat.Builder setInlongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; + return this; + } + + public JdbcOutputFormat.Builder setAuditHostAndPorts(String auditHostAndPorts) { + this.auditHostAndPorts = auditHostAndPorts; + return this; + } + + public JdbcOutputFormat.Builder setAuditKeys(String auditKeys) { + this.auditKeys = auditKeys; + return this; + } + + /** + * Finalizes the configuration and checks validity. + * + * @return Configured JdbcUpsertOutputFormat + */ + public JdbcOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> build() { + checkNotNull(options, "No options supplied."); + checkNotNull(fieldNames, "No fieldNames supplied."); + JdbcDmlOptions dml = + JdbcDmlOptions.builder() + .withTableName(options.getTableName()) + .withDialect(options.getDialect()) + .withFieldNames(fieldNames) + .withKeyFields(keyFields) + .withFieldTypes(fieldTypes) + .build(); + if (dml.getKeyFields().isPresent() && dml.getKeyFields().get().length > 0) { + return new TableJdbcUpsertOutputFormat( + new SimpleJdbcConnectionProvider(options), + dml, + executionOptionsBuilder.build(), + inlongMetric, + auditHostAndPorts, + auditKeys); + } else { + // warn: don't close over builder fields + String sql = + FieldNamedPreparedStatementImpl.parseNamedStatement( + options.getDialect() + .getInsertIntoStatement( + dml.getTableName(), dml.getFieldNames()), + new HashMap<>()); + return new JdbcOutputFormat<>( + new SimpleJdbcConnectionProvider(options), + executionOptionsBuilder.build(), + ctx -> createSimpleRowExecutor( + sql, + dml.getFieldTypes(), + ctx.getExecutionConfig().isObjectReuseEnabled()), + tuple2 -> { + Preconditions.checkArgument(tuple2.f0); + return tuple2.f1; + }, + inlongMetric, + auditHostAndPorts, + auditKeys); + } + } + } + + static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor( + String sql, int[] fieldTypes, boolean objectReuse) { + return JdbcBatchStatementExecutor.simple( + sql, + createRowJdbcStatementBuilder(fieldTypes), + objectReuse ? Row::copy : Function.identity()); + } + + /** + * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array. + * Uses {@link JdbcUtils#setRecordToStatement} + */ + static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) { + return (st, record) -> setRecordToStatement(st, types, record); + } + + public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException { + jdbcStatementExecutor.closeStatements(); + jdbcStatementExecutor.prepareStatements( + reconnect + ? connectionProvider.reestablishConnection() + : connectionProvider.getConnection()); + } + + /** Returns configured {@code JdbcExecutionOptions}. */ + public JdbcExecutionOptions getExecutionOptions() { + return executionOptions; + } + + @VisibleForTesting + public Connection getConnection() { + return connectionProvider.getConnection(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java new file mode 100644 index 0000000000..5c5ff2978c --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.function.Function; + +import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getPrimaryKey; +import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Upsert jdbc output format. + * Modify from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat} + * */ + +class TableJdbcUpsertOutputFormat + extends + JdbcOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> { + + private static final Logger LOG = LoggerFactory.getLogger( + TableJdbcUpsertOutputFormat.class); + + private JdbcBatchStatementExecutor<Row> deleteExecutor; + private final StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> deleteStatementExecutorFactory; + + // audit + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + + TableJdbcUpsertOutputFormat( + JdbcConnectionProvider connectionProvider, + JdbcDmlOptions dmlOptions, + JdbcExecutionOptions batchOptions, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { + this( + connectionProvider, + batchOptions, + ctx -> createUpsertRowExecutor(dmlOptions, ctx), + ctx -> createDeleteExecutor(dmlOptions, ctx), + inlongMetric, + auditHostAndPorts, + auditKeys); + } + + @VisibleForTesting + TableJdbcUpsertOutputFormat( + JdbcConnectionProvider connectionProvider, + JdbcExecutionOptions batchOptions, + StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory, + StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> deleteStatementExecutorFactory, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { + super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1, + inlongMetric, auditHostAndPorts, auditKeys); + this.deleteStatementExecutorFactory = deleteStatementExecutorFactory; + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + super.open(taskNumber, numTasks); + deleteExecutor = deleteStatementExecutorFactory.apply(getRuntimeContext()); + try { + deleteExecutor.prepareStatements(connectionProvider.getConnection()); + } catch (SQLException e) { + throw new IOException(e); + } + } + + private static JdbcBatchStatementExecutor<Row> createDeleteExecutor( + JdbcDmlOptions dmlOptions, RuntimeContext ctx) { + int[] pkFields = + Arrays.stream(dmlOptions.getFieldNames()) + .mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf) + .toArray(); + int[] pkTypes = + dmlOptions.getFieldTypes() == null + ? null + : Arrays.stream(pkFields).map(f -> dmlOptions.getFieldTypes()[f]).toArray(); + String deleteSql = + FieldNamedPreparedStatementImpl.parseNamedStatement( + dmlOptions + .getDialect() + .getDeleteStatement( + dmlOptions.getTableName(), dmlOptions.getFieldNames()), + new HashMap<>()); + return createKeyedRowExecutor(pkFields, pkTypes, deleteSql); + } + + @Override + protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted) throws SQLException { + if (original.f0) { + super.addToBatch(original, extracted); + } else { + deleteExecutor.addToBatch(extracted); + } + } + + @Override + public synchronized void close() { + try { + super.close(); + } finally { + try { + if (deleteExecutor != null) { + deleteExecutor.closeStatements(); + } + } catch (SQLException e) { + LOG.warn("unable to close delete statement runner", e); + } + } + } + + @Override + protected void attemptFlush() throws SQLException { + super.attemptFlush(); + deleteExecutor.executeBatch(); + } + + @Override + public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException { + super.updateExecutor(reconnect); + deleteExecutor.closeStatements(); + deleteExecutor.prepareStatements(connectionProvider.getConnection()); + } + + private static JdbcBatchStatementExecutor<Row> createKeyedRowExecutor( + int[] pkFields, int[] pkTypes, String sql) { + return JdbcBatchStatementExecutor.keyed( + sql, + createRowKeyExtractor(pkFields), + (st, record) -> setRecordToStatement( + st, pkTypes, createRowKeyExtractor(pkFields).apply(record))); + } + + private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor( + JdbcDmlOptions opt, RuntimeContext ctx) { + checkArgument(opt.getKeyFields().isPresent()); + + int[] pkFields = + Arrays.stream(opt.getKeyFields().get()) + .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf) + .toArray(); + int[] pkTypes = + opt.getFieldTypes() == null + ? null + : Arrays.stream(pkFields).map(f -> opt.getFieldTypes()[f]).toArray(); + + return opt.getDialect() + .getUpsertStatement( + opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get()) + .map( + sql -> createSimpleRowExecutor( + parseNamedStatement(sql), + opt.getFieldTypes(), + ctx.getExecutionConfig().isObjectReuseEnabled())) + .orElseGet( + () -> new InsertOrUpdateJdbcExecutor<>( + parseNamedStatement( + opt.getDialect() + .getRowExistsStatement( + opt.getTableName(), + opt.getKeyFields().get())), + parseNamedStatement( + opt.getDialect() + .getInsertIntoStatement( + opt.getTableName(), + opt.getFieldNames())), + parseNamedStatement( + opt.getDialect() + .getUpdateStatement( + opt.getTableName(), + opt.getFieldNames(), + opt.getKeyFields().get())), + createRowJdbcStatementBuilder(pkTypes), + createRowJdbcStatementBuilder(opt.getFieldTypes()), + createRowJdbcStatementBuilder(opt.getFieldTypes()), + createRowKeyExtractor(pkFields), + ctx.getExecutionConfig().isObjectReuseEnabled() + ? Row::copy + : Function.identity())); + } + + private static String parseNamedStatement(String statement) { + return FieldNamedPreparedStatementImpl.parseNamedStatement(statement, new HashMap<>()); + } + + private static Function<Row, Row> createRowKeyExtractor(int[] pkFields) { + return row -> getPrimaryKey(row, pkFields); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java new file mode 100644 index 0000000000..866d498133 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; +import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_MAX_RETRIES; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.MAX_RETRY_TIMEOUT; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_NUM; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_MAX_RETRIES; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME; +import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; + +/** + * Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link + * JdbcDynamicTableSink}. + * Modify from {@link org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory} + */ +@Internal +public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "jdbc-inlong"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig config = helper.getOptions(); + + helper.validate(); + validateConfigOptions(config, context.getClassLoader()); + validateDataTypeWithJdbcDialect( + context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); + InternalJdbcConnectionOptions jdbcOptions = + getJdbcOptions(config, context.getClassLoader()); + + // inlong audit + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null); + String auditKeys = config.getOptional(AUDIT_KEYS).orElse(null); + + return new JdbcDynamicTableSink( + jdbcOptions, + getJdbcExecutionOptions(config), + getJdbcDmlOptions( + jdbcOptions, + context.getPhysicalRowDataType(), + context.getPrimaryKeyIndexes()), + context.getPhysicalRowDataType(), + inlongMetric, + auditHostAndPorts, + auditKeys); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig config = helper.getOptions(); + + helper.validate(); + validateConfigOptions(config, context.getClassLoader()); + validateDataTypeWithJdbcDialect( + context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); + + return new JdbcDynamicTableSource( + getJdbcOptions(helper.getOptions(), context.getClassLoader()), + getJdbcReadOptions(helper.getOptions()), + helper.getOptions().get(LookupOptions.MAX_RETRIES), + getLookupCache(config), + context.getPhysicalRowDataType()); + } + + private static void validateDataTypeWithJdbcDialect( + DataType dataType, String url, ClassLoader classLoader) { + final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader); + dialect.validate((RowType) dataType.getLogicalType()); + } + + private InternalJdbcConnectionOptions getJdbcOptions( + ReadableConfig readableConfig, ClassLoader classLoader) { + final String url = readableConfig.get(URL); + final InternalJdbcConnectionOptions.Builder builder = + InternalJdbcConnectionOptions.builder() + .setClassLoader(classLoader) + .setDBUrl(url) + .setTableName(readableConfig.get(TABLE_NAME)) + .setDialect(JdbcDialectLoader.load(url, classLoader)) + .setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null)) + .setConnectionCheckTimeoutSeconds( + (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds()); + + readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName); + readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); + readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); + return builder.build(); + } + + private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) { + final Optional<String> partitionColumnName = + readableConfig.getOptional(SCAN_PARTITION_COLUMN); + final JdbcReadOptions.Builder builder = JdbcReadOptions.builder(); + if (partitionColumnName.isPresent()) { + builder.setPartitionColumnName(partitionColumnName.get()); + builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND)); + builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND)); + builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM)); + } + readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize); + builder.setAutoCommit(readableConfig.get(SCAN_AUTO_COMMIT)); + return builder.build(); + } + + private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) { + final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder(); + builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS)); + builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); + builder.withMaxRetries(config.get(SINK_MAX_RETRIES)); + return builder.build(); + } + + private JdbcDmlOptions getJdbcDmlOptions( + InternalJdbcConnectionOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) { + + String[] keyFields = + Arrays.stream(primaryKeyIndexes) + .mapToObj(i -> DataType.getFieldNames(dataType).get(i)) + .toArray(String[]::new); + + return JdbcDmlOptions.builder() + .withTableName(jdbcOptions.getTableName()) + .withDialect(jdbcOptions.getDialect()) + .withFieldNames(DataType.getFieldNames(dataType).toArray(new String[0])) + .withKeyFields(keyFields.length > 0 ? keyFields : null) + .build(); + } + + @Nullable + private LookupCache getLookupCache(ReadableConfig tableOptions) { + LookupCache cache = null; + // Legacy cache options + if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0 + && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) > 0) { + cache = + DefaultLookupCache.newBuilder() + .maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS)) + .expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL)) + .cacheMissingKey(tableOptions.get(LOOKUP_CACHE_MISSING_KEY)) + .build(); + } + if (tableOptions + .get(LookupOptions.CACHE_TYPE) + .equals(LookupOptions.LookupCacheType.PARTIAL)) { + cache = DefaultLookupCache.fromConfig(tableOptions); + } + return cache; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> requiredOptions = new HashSet<>(); + requiredOptions.add(URL); + requiredOptions.add(TABLE_NAME); + return requiredOptions; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> optionalOptions = new HashSet<>(); + optionalOptions.add(DRIVER); + optionalOptions.add(USERNAME); + optionalOptions.add(PASSWORD); + optionalOptions.add(SCAN_PARTITION_COLUMN); + optionalOptions.add(SCAN_PARTITION_LOWER_BOUND); + optionalOptions.add(SCAN_PARTITION_UPPER_BOUND); + optionalOptions.add(SCAN_PARTITION_NUM); + optionalOptions.add(SCAN_FETCH_SIZE); + optionalOptions.add(SCAN_AUTO_COMMIT); + optionalOptions.add(LOOKUP_CACHE_MAX_ROWS); + optionalOptions.add(LOOKUP_CACHE_TTL); + optionalOptions.add(LOOKUP_MAX_RETRIES); + optionalOptions.add(LOOKUP_CACHE_MISSING_KEY); + optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS); + optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL); + optionalOptions.add(SINK_MAX_RETRIES); + optionalOptions.add(SINK_PARALLELISM); + optionalOptions.add(MAX_RETRY_TIMEOUT); + optionalOptions.add(LookupOptions.CACHE_TYPE); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY); + optionalOptions.add(LookupOptions.MAX_RETRIES); + optionalOptions.add(AUDIT_KEYS); + optionalOptions.add(INLONG_METRIC); + optionalOptions.add(INLONG_AUDIT); + return optionalOptions; + } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + URL, + TABLE_NAME, + USERNAME, + PASSWORD, + DRIVER, + SINK_BUFFER_FLUSH_MAX_ROWS, + SINK_BUFFER_FLUSH_INTERVAL, + SINK_MAX_RETRIES, + MAX_RETRY_TIMEOUT, + SCAN_FETCH_SIZE, + SCAN_AUTO_COMMIT) + .collect(Collectors.toSet()); + } + + private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) { + String jdbcUrl = config.get(URL); + JdbcDialectLoader.load(jdbcUrl, classLoader); + + checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD}); + + checkAllOrNone( + config, + new ConfigOption[]{ + SCAN_PARTITION_COLUMN, + SCAN_PARTITION_NUM, + SCAN_PARTITION_LOWER_BOUND, + SCAN_PARTITION_UPPER_BOUND + }); + + if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent() + && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) { + long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND); + long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND); + if (lowerBound > upperBound) { + throw new IllegalArgumentException( + String.format( + "'%s'='%s' must not be larger than '%s'='%s'.", + SCAN_PARTITION_LOWER_BOUND.key(), + lowerBound, + SCAN_PARTITION_UPPER_BOUND.key(), + upperBound)); + } + } + + checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL}); + + if (config.get(LOOKUP_MAX_RETRIES) < 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option shouldn't be negative, but is %s.", + LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES))); + } + + if (config.get(SINK_MAX_RETRIES) < 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option shouldn't be negative, but is %s.", + SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES))); + } + + if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.", + MAX_RETRY_TIMEOUT.key(), + config.get( + ConfigOptions.key(MAX_RETRY_TIMEOUT.key()) + .stringType() + .noDefaultValue()))); + } + } + + private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) { + int presentCount = 0; + for (ConfigOption configOption : configOptions) { + if (config.getOptional(configOption).isPresent()) { + presentCount++; + } + } + String[] propertyNames = + Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new); + Preconditions.checkArgument( + configOptions.length == presentCount || presentCount == 0, + "Either all or none of the following options should be provided:\n" + + String.join("\n", propertyNames)); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java new file mode 100644 index 0000000000..6721130414 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.table; + +import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link DynamicTableSink} for JDBC. + * Modify from {@link org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink} + * */ +@Internal +public class JdbcDynamicTableSink implements DynamicTableSink { + + private final InternalJdbcConnectionOptions jdbcOptions; + private final JdbcExecutionOptions executionOptions; + private final JdbcDmlOptions dmlOptions; + private final DataType physicalRowDataType; + private final String dialectName; + + // audit + String inlongMetric; + String auditHostAndPorts; + String auditKeys; + + public JdbcDynamicTableSink( + InternalJdbcConnectionOptions jdbcOptions, + JdbcExecutionOptions executionOptions, + JdbcDmlOptions dmlOptions, + DataType physicalRowDataType, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { + this.jdbcOptions = jdbcOptions; + this.executionOptions = executionOptions; + this.dmlOptions = dmlOptions; + this.physicalRowDataType = physicalRowDataType; + this.dialectName = dmlOptions.getDialect().dialectName(); + // audit + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + validatePrimaryKey(requestedMode); + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); + } + + private void validatePrimaryKey(ChangelogMode requestedMode) { + checkState( + ChangelogMode.insertOnly().equals(requestedMode) + || dmlOptions.getKeyFields().isPresent(), + "please declare primary key for sink table when query contains update/delete record."); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final TypeInformation<RowData> rowDataTypeInformation = + context.createTypeInformation(physicalRowDataType); + final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder(); + + builder.setJdbcOptions(jdbcOptions); + builder.setJdbcDmlOptions(dmlOptions); + builder.setJdbcExecutionOptions(executionOptions); + builder.setRowDataTypeInfo(rowDataTypeInformation); + builder.setFieldDataTypes( + DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0])); + // audit + builder.setInlongMetric(inlongMetric); + builder.setAuditHostAndPorts(auditHostAndPorts); + builder.setAuditKeys(auditKeys); + return SinkFunctionProvider.of( + new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism()); + } + + @Override + public DynamicTableSink copy() { + return new JdbcDynamicTableSink( + jdbcOptions, executionOptions, dmlOptions, physicalRowDataType, + inlongMetric, auditHostAndPorts, auditKeys); + } + + @Override + public String asSummaryString() { + return "JDBC:" + dialectName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof JdbcDynamicTableSink)) { + return false; + } + JdbcDynamicTableSink that = (JdbcDynamicTableSink) o; + return Objects.equals(jdbcOptions, that.jdbcOptions) + && Objects.equals(executionOptions, that.executionOptions) + && Objects.equals(dmlOptions, that.dmlOptions) + && Objects.equals(physicalRowDataType, that.physicalRowDataType) + && Objects.equals(dialectName, that.dialectName) + && Objects.equals(inlongMetric, that.inlongMetric) + && Objects.equals(auditHostAndPorts, that.auditHostAndPorts) + && Objects.equals(auditKeys, that.auditKeys); + } + + @Override + public int hashCode() { + return Objects.hash( + jdbcOptions, executionOptions, dmlOptions, physicalRowDataType, dialectName, + inlongMetric, auditHostAndPorts, auditKeys); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java new file mode 100644 index 0000000000..436b763ee5 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.table; + +import org.apache.inlong.sort.jdbc.internal.JdbcOutputFormat; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor; +import org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor; +import org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor; +import org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor; +import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.function.Function; + +import static org.apache.flink.table.data.RowData.createFieldGetter; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Builder for {@link JdbcOutputFormat} for Table/SQL. + * Modify from {@link org.apache.flink.connector.jdbc.table.JdbcOutputFormatBuilder} + * */ +public class JdbcOutputFormatBuilder implements Serializable { + + private static final long serialVersionUID = 1L; + + private InternalJdbcConnectionOptions jdbcOptions; + private JdbcExecutionOptions executionOptions; + private JdbcDmlOptions dmlOptions; + private TypeInformation<RowData> rowDataTypeInformation; + private DataType[] fieldDataTypes; + + // audit + String inlongMetric; + String auditHostAndPorts; + String auditKeys; + + public JdbcOutputFormatBuilder() { + } + + public JdbcOutputFormatBuilder setJdbcOptions(InternalJdbcConnectionOptions jdbcOptions) { + this.jdbcOptions = jdbcOptions; + return this; + } + + public JdbcOutputFormatBuilder setJdbcExecutionOptions(JdbcExecutionOptions executionOptions) { + this.executionOptions = executionOptions; + return this; + } + + public JdbcOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions dmlOptions) { + this.dmlOptions = dmlOptions; + return this; + } + + public JdbcOutputFormatBuilder setRowDataTypeInfo(TypeInformation<RowData> rowDataTypeInfo) { + this.rowDataTypeInformation = rowDataTypeInfo; + return this; + } + + public JdbcOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) { + this.fieldDataTypes = fieldDataTypes; + return this; + } + + public JdbcOutputFormatBuilder setInlongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; + return this; + } + + public JdbcOutputFormatBuilder setAuditHostAndPorts(String auditHostAndPorts) { + this.auditHostAndPorts = auditHostAndPorts; + return this; + } + + public JdbcOutputFormatBuilder setAuditKeys(String auditKeys) { + this.auditKeys = auditKeys; + return this; + } + + public JdbcOutputFormat<RowData, ?, ?> build() { + checkNotNull(jdbcOptions, "jdbc options can not be null"); + checkNotNull(dmlOptions, "jdbc dml options can not be null"); + checkNotNull(executionOptions, "jdbc execution options can not be null"); + + final LogicalType[] logicalTypes = + Arrays.stream(fieldDataTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new); + if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) { + // upsert query + return new JdbcOutputFormat<>( + new SimpleJdbcConnectionProvider(jdbcOptions), + executionOptions, + ctx -> createBufferReduceExecutor( + dmlOptions, ctx, rowDataTypeInformation, logicalTypes), + JdbcOutputFormat.RecordExtractor.identity(), + inlongMetric, + auditHostAndPorts, + auditKeys); + } else { + // append only query + final String sql = + dmlOptions + .getDialect() + .getInsertIntoStatement( + dmlOptions.getTableName(), dmlOptions.getFieldNames()); + return new JdbcOutputFormat<>( + new SimpleJdbcConnectionProvider(jdbcOptions), + executionOptions, + ctx -> createSimpleBufferedExecutor( + ctx, + dmlOptions.getDialect(), + dmlOptions.getFieldNames(), + logicalTypes, + sql, + rowDataTypeInformation), + JdbcOutputFormat.RecordExtractor.identity(), + inlongMetric, + auditHostAndPorts, + auditKeys); + } + } + + private static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor( + JdbcDmlOptions opt, + RuntimeContext ctx, + TypeInformation<RowData> rowDataTypeInfo, + LogicalType[] fieldTypes) { + checkArgument(opt.getKeyFields().isPresent()); + JdbcDialect dialect = opt.getDialect(); + String tableName = opt.getTableName(); + String[] pkNames = opt.getKeyFields().get(); + int[] pkFields = + Arrays.stream(pkNames) + .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf) + .toArray(); + LogicalType[] pkTypes = + Arrays.stream(pkFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new); + final TypeSerializer<RowData> typeSerializer = + rowDataTypeInfo.createSerializer(ctx.getExecutionConfig()); + final Function<RowData, RowData> valueTransform = + ctx.getExecutionConfig().isObjectReuseEnabled() + ? typeSerializer::copy + : Function.identity(); + + return new TableBufferReducedStatementExecutor( + createUpsertRowExecutor( + dialect, + tableName, + opt.getFieldNames(), + fieldTypes, + pkFields, + pkNames, + pkTypes), + createDeleteExecutor(dialect, tableName, pkNames, pkTypes), + createRowKeyExtractor(fieldTypes, pkFields), + valueTransform); + } + + private static JdbcBatchStatementExecutor<RowData> createSimpleBufferedExecutor( + RuntimeContext ctx, + JdbcDialect dialect, + String[] fieldNames, + LogicalType[] fieldTypes, + String sql, + TypeInformation<RowData> rowDataTypeInfo) { + final TypeSerializer<RowData> typeSerializer = + rowDataTypeInfo.createSerializer(ctx.getExecutionConfig()); + return new TableBufferedStatementExecutor( + createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql), + ctx.getExecutionConfig().isObjectReuseEnabled() + ? typeSerializer::copy + : Function.identity()); + } + + private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor( + JdbcDialect dialect, + String tableName, + String[] fieldNames, + LogicalType[] fieldTypes, + int[] pkFields, + String[] pkNames, + LogicalType[] pkTypes) { + return dialect.getUpsertStatement(tableName, fieldNames, pkNames) + .map(sql -> createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql)) + .orElseGet( + () -> createInsertOrUpdateExecutor( + dialect, + tableName, + fieldNames, + fieldTypes, + pkFields, + pkNames, + pkTypes)); + } + + private static JdbcBatchStatementExecutor<RowData> createDeleteExecutor( + JdbcDialect dialect, String tableName, String[] pkNames, LogicalType[] pkTypes) { + String deleteSql = dialect.getDeleteStatement(tableName, pkNames); + return createSimpleRowExecutor(dialect, pkNames, pkTypes, deleteSql); + } + + private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor( + JdbcDialect dialect, String[] fieldNames, LogicalType[] fieldTypes, final String sql) { + final JdbcRowConverter rowConverter = dialect.getRowConverter(RowType.of(fieldTypes)); + return new TableSimpleStatementExecutor( + connection -> FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames), + rowConverter); + } + + private static JdbcBatchStatementExecutor<RowData> createInsertOrUpdateExecutor( + JdbcDialect dialect, + String tableName, + String[] fieldNames, + LogicalType[] fieldTypes, + int[] pkFields, + String[] pkNames, + LogicalType[] pkTypes) { + final String existStmt = dialect.getRowExistsStatement(tableName, pkNames); + final String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames); + final String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, pkNames); + return new TableInsertOrUpdateStatementExecutor( + connection -> FieldNamedPreparedStatement.prepareStatement( + connection, existStmt, pkNames), + connection -> FieldNamedPreparedStatement.prepareStatement( + connection, insertStmt, fieldNames), + connection -> FieldNamedPreparedStatement.prepareStatement( + connection, updateStmt, fieldNames), + dialect.getRowConverter(RowType.of(pkTypes)), + dialect.getRowConverter(RowType.of(fieldTypes)), + dialect.getRowConverter(RowType.of(fieldTypes)), + createRowKeyExtractor(fieldTypes, pkFields)); + } + + private static Function<RowData, RowData> createRowKeyExtractor( + LogicalType[] logicalTypes, int[] pkFields) { + final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[pkFields.length]; + for (int i = 0; i < pkFields.length; i++) { + fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]], pkFields[i]); + } + return row -> getPrimaryKey(row, fieldGetters); + } + + private static RowData getPrimaryKey(RowData row, RowData.FieldGetter[] fieldGetters) { + GenericRowData pkRow = new GenericRowData(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + pkRow.setField(i, fieldGetters[i].getFieldOrNull(row)); + } + return pkRow; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..a14e9cc440 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.jdbc.table.JdbcDynamicTableFactory \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml index a3fe402733..cf81e1a2ac 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml @@ -32,6 +32,7 @@ <modules> <module>pulsar</module> + <module>jdbc</module> </modules> <properties> diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index bd944b384c..e0443b0fd6 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -838,6 +838,15 @@ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE + 1.3.22 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java + Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) + License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE + ======================================================================= Apache InLong Subcomponents: