This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new cf28445c76 [INLONG-8358][Sort] Add kafka connector on flink 1.15 (#8713) cf28445c76 is described below commit cf28445c7627be0ffbe627f10b274bca3476011a Author: Hao <1780095+hnrai...@users.noreply.github.com> AuthorDate: Tue Dec 12 11:03:36 2023 +0800 [INLONG-8358][Sort] Add kafka connector on flink 1.15 (#8713) --- .../src/main/assemblies/sort-connectors-v1.15.xml | 8 + inlong-sort/sort-core/pom.xml | 6 + .../sort-end-to-end-tests-v1.15/pom.xml | 14 +- ...MysqlToRocksITCase.java => KafkaE2EITCase.java} | 122 +++-- .../inlong/sort/tests/MysqlToRocksITCase.java | 13 +- .../test/resources/env/kafka_test_kafka_init.txt | 1 + .../src/test/resources/flinkSql/kafka_test.sql | 61 +++ .../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 179 +++++++ .../org/apache/inlong/sort/kafka/KafkaOptions.java | 50 ++ .../kafka/table/KafkaConnectorOptionsUtil.java | 585 +++++++++++++++++++++ .../sort/kafka/table/KafkaDynamicTableFactory.java | 442 ++++++++++++++++ .../table/UpsertKafkaDynamicTableFactory.java | 416 +++++++++++++++ .../org.apache.flink.table.factories.Factory | 17 + .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE | 6 + pom.xml | 20 +- 16 files changed, 1900 insertions(+), 41 deletions(-) diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml index 855b66f858..1df930a929 100644 --- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml +++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml @@ -105,6 +105,14 @@ <includes> <include>sort-connector-hudi-v1.15-${project.version}.jar</include> </includes> + <fileMode>0644</fileMode> + </fileSet> + <fileSet> + <directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/target</directory> + <outputDirectory>inlong-sort/connectors</outputDirectory> + <includes> + <include>sort-connector-kafka-v1.15-${project.version}.jar</include> + </includes> <fileMode>0644</fileMode> </fileSet> </fileSets> diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index a18d177315..e8974e29f0 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -299,6 +299,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-kafka-v1.15</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 07d8663682..abdb6e6500 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -46,12 +46,14 @@ <dependency> <groupId>org.testcontainers</groupId> <artifactId>postgresql</artifactId> - <version>${testcontainers.version}</version> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>mongodb</artifactId> - <version>${testcontainers.version}</version> </dependency> <dependency> <groupId>org.mongodb</groupId> @@ -223,6 +225,14 @@ <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-kafka-v1.15</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-kafka.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> </artifactItems> </configuration> <executions> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java similarity index 53% copy from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java index bbeccd04a5..1399fe2f6f 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests; import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; +import org.apache.inlong.sort.tests.utils.PlaceholderResolver; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -29,9 +30,17 @@ import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container.ExecResult; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; +import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; @@ -40,7 +49,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; @@ -49,28 +61,42 @@ import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRock import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; /** - * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. - * Test flink sql Mysql cdc to StarRocks + * End-to-end tests for sort-connector-kafka uber jar. */ -public class MysqlToRocksITCase extends FlinkContainerTestEnv { +public class KafkaE2EITCase extends FlinkContainerTestEnv { - private static final Logger LOG = LoggerFactory.getLogger(MysqlToRocksITCase.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaE2EITCase.class); + public static final Logger MYSQL_LOG = LoggerFactory.getLogger(MySqlContainer.class); + + public static final Logger KAFKA_LOG = LoggerFactory.getLogger(KafkaContainer.class); + + private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar"); private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar"); - private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); + private static final Path starrocksJar = TestUtils.getResource("sort-connector-starrocks.jar"); private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); + private static final String sqlFile; static { try { - sqlFile = - Paths.get(MysqlToRocksITCase.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString(); + URI kafkaSqlFile = + Objects.requireNonNull(KafkaE2EITCase.class.getResource("/flinkSql/kafka_test.sql")).toURI(); + sqlFile = Paths.get(kafkaSqlFile).toString(); buildStarRocksImage(); } catch (URISyntaxException e) { throw new RuntimeException(e); } } + @ClassRule + public static final KafkaContainer KAFKA = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases("kafka") + .withEmbeddedZookeeper() + .withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG)); + @ClassRule public static StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) @@ -86,7 +112,7 @@ public class MysqlToRocksITCase extends FlinkContainerTestEnv { .withDatabaseName("test") .withNetwork(NETWORK) .withNetworkAliases("mysql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG)); @Before public void setup() { @@ -103,7 +129,7 @@ public class MysqlToRocksITCase extends FlinkContainerTestEnv { MYSQL_CONTAINER.getPassword()); Statement stat = conn.createStatement(); stat.execute( - "CREATE TABLE test_input1 (\n" + "CREATE TABLE test_input (\n" + " id SERIAL,\n" + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + " description VARCHAR(512),\n" @@ -118,49 +144,83 @@ public class MysqlToRocksITCase extends FlinkContainerTestEnv { @AfterClass public static void teardown() { + if (KAFKA != null) { + KAFKA.stop(); + } + if (MYSQL_CONTAINER != null) { MYSQL_CONTAINER.stop(); } + if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } } + private void initializeKafkaTable(String topic) { + String fileName = "kafka_test_kafka_init.txt"; + int port = KafkaContainer.ZOOKEEPER_PORT; + + Map<String, Object> properties = new HashMap<>(); + properties.put("TOPIC", topic); + properties.put("ZOOKEEPER_PORT", port); + + try { + String createKafkaStatement = getCreateStatement(fileName, properties); + ExecResult result = KAFKA.execInContainer("bash", "-c", createKafkaStatement); + LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, result.getStdout()); + if (result.getExitCode() != 0) { + throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode()); + } + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private String getCreateStatement(String fileName, Map<String, Object> properties) { + URL url = Objects.requireNonNull(KafkaE2EITCase.class.getResource("/env/" + fileName)); + + try { + Path file = Paths.get(url.toURI()); + return PlaceholderResolver.getDefaultResolver().resolveByMap( + new String(Files.readAllBytes(file), StandardCharsets.UTF_8), + properties); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + /** - * Test flink sql postgresql cdc to StarRocks + * Test flink sql mysql cdc to starrocks. * - * @throws Exception The exception may throws when execute the case + * @throws Exception The exception may throw when execute the case */ @Test - public void testMysqlUpdateAndDelete() throws Exception { - submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar); + public void testKafkaWithSqlFile() throws Exception { + final String topic = "test-topic"; + initializeKafkaTable(topic); + + submitSQLJob(sqlFile, kafkaJar, starrocksJar, mysqlJar, mysqlJdbcJar); waitUntilJobRunning(Duration.ofSeconds(10)); // generate input - try (Connection conn = - DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), - MYSQL_CONTAINER.getPassword()); + try (Connection conn = DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword()); Statement stat = conn.createStatement()) { - stat.execute( - "INSERT INTO test_input1 " - + "VALUES (1,'jacket','water resistent white wind breaker');"); - stat.execute( - "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');"); - stat.execute( - "update test_input1 set name = 'tom' where id = 2;"); - stat.execute( - "delete from test_input1 where id = 1;"); + stat.execute("INSERT INTO test_input VALUES (1,'jacket','water resistant white wind breaker');"); + stat.execute("INSERT INTO test_input VALUES (2,'scooter','Big 2-wheel scooter ');"); } catch (SQLException e) { LOG.error("Update table for CDC failed.", e); throw e; } - JdbcProxy proxy = - new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), - STAR_ROCKS.getPassword(), - STAR_ROCKS.getDriverClassName()); - List<String> expectResult = - Arrays.asList("2,tom,Big 2-wheel scooter "); + JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), + STAR_ROCKS.getPassword(), + STAR_ROCKS.getDriverClassName()); + + List<String> expectResult = Arrays.asList( + "1,jacket,water resistant white wind breaker", + "2,scooter,Big 2-wheel scooter "); proxy.checkResultWithTimeout( expectResult, "test_output1", diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java index bbeccd04a5..51501772a9 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java @@ -39,7 +39,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; @@ -155,12 +155,11 @@ public class MysqlToRocksITCase extends FlinkContainerTestEnv { throw e; } - JdbcProxy proxy = - new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), - STAR_ROCKS.getPassword(), - STAR_ROCKS.getDriverClassName()); - List<String> expectResult = - Arrays.asList("2,tom,Big 2-wheel scooter "); + JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), + STAR_ROCKS.getPassword(), + STAR_ROCKS.getDriverClassName()); + + List<String> expectResult = Collections.singletonList("2,tom,Big 2-wheel scooter "); proxy.checkResultWithTimeout( expectResult, "test_output1", diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt new file mode 100644 index 0000000000..b2f31d78fa --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt @@ -0,0 +1 @@ +kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 --zookeeper localhost:${ZOOKEEPER_PORT} \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql new file mode 100644 index 0000000000..5bda3b9366 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql @@ -0,0 +1,61 @@ +CREATE TABLE test_input ( + `id` INT primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'mysql-cdc-inlong', + 'hostname' = 'mysql', + 'port' = '3306', + 'username' = 'root', + 'password' = 'inlong', + 'database-name' = 'test', + 'table-name' = 'test_input', + 'scan.incremental.snapshot.enabled' = 'false', + 'jdbc.properties.useSSL' = 'false', + 'jdbc.properties.allowPublicKeyRetrieval' = 'true' +); + +CREATE TABLE kafka_load ( + `id` INT NOT NULL primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'upsert-kafka-inlong', + 'topic' = 'test-topic', + 'properties.bootstrap.servers' = 'kafka:9092', + 'key.format' = 'csv', + 'value.format' = 'csv' +); + +CREATE TABLE kafka_extract ( + `id` INT NOT NULL, + name STRING, + description STRING +) WITH ( + 'connector' = 'kafka-inlong', + 'topic' = 'test-topic', + 'properties.bootstrap.servers' = 'kafka:9092', + 'properties.group.id' = 'testGroup', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'csv' +); + +CREATE TABLE test_output ( + `id` INT primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'starrocks-inlong', + 'jdbc-url' = 'jdbc:mysql://starrocks:9030', + 'load-url'='starrocks:8030', + 'database-name'='test', + 'table-name' = 'test_output1', + 'username' = 'inlong', + 'password' = 'inlong', + 'sink.properties.format' = 'json', + 'sink.properties.strip_outer_array' = 'true', + 'sink.buffer-flush.interval-ms' = '1000' +); + +INSERT INTO kafka_load select * from test_input; +INSERT INTO test_output select * from kafka_extract; \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml new file mode 100644 index 0000000000..61fcc35247 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml @@ -0,0 +1,179 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connectors-v1.15</artifactId> + <version>1.11.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-connector-kafka-v1.15</artifactId> + <packaging>jar</packaging> + <name>Apache InLong - Sort-connector-kafka</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>audit-sdk</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + <configuration> + <artifactSet> + <includes> + <include>org.apache.inlong:*</include> + <include>org.apache.kafka:*</include> + <include>org.apache.flink:flink-connector-kafka</include> + <include>org.apache.flink:flink-connector-base</include> + <!-- Include fixed version 18.0-13.0 of flink shaded guava --> + <include>org.apache.flink:flink-shaded-guava</include> + <include>org.apache.httpcomponents:*</include> + <include>org.apache.commons:commons-lang3</include> + <include>com.google.protobuf:*</include> + <include>joda-time:*</include> + <include>com.fasterxml.jackson.core:*</include> + <include>com.amazonaws:*</include> + <include>software.amazon.ion:*</include> + <include>commons-logging:commons-logging</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>org.apache.kafka:*</artifact> + <excludes> + <exclude>kafka/kafka-version.properties</exclude> + <exclude>LICENSE</exclude> + <!-- Does not contain anything relevant. + Cites a binary dependency on jersey, but this is neither reflected in the + dependency graph, nor are any jersey files bundled. --> + <exclude>NOTICE</exclude> + <exclude>common/**</exclude> + </excludes> + </filter> + <filter> + <artifact>org.apache.inlong:sort-connector-*</artifact> + <includes> + <include>org/apache/inlong/**</include> + <include>META-INF/services/org.apache.flink.table.factories.Factory</include> + <include>META-INF/services/org.apache.flink.table.factories.TableFactory</include> + </includes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.apache.kafka</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.kafka</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.commons.logging</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.commons.logging</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.commons.lang3</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.commons.lang3</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.http</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.http</shadedPattern> + </relocation> + + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.com.google</shadedPattern> + </relocation> + <relocation> + <pattern>com.amazonaws</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.com.amazonaws</shadedPattern> + </relocation> + <relocation> + <pattern>software.amazon.ion</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.software.amazon.ion</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.com.fasterxml.jackson</shadedPattern> + </relocation> + <relocation> + <pattern>org.joda.time</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.joda.time</shadedPattern> + </relocation> + + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.inlong.sort.configuration</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.configuration</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.inlong.sort.protocol</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.protocol</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.inlong.sort.schema</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.schema</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.inlong.sort.util</pattern> + <shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.util</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java new file mode 100644 index 0000000000..6962eb6948 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Option utils for Kafka table source sink. */ +public class KafkaOptions { + + private KafkaOptions() { + } + + public static final ConfigOption<String> SINK_MULTIPLE_PARTITION_PATTERN = + ConfigOptions.key("sink.multiple.partition-pattern") + .stringType() + .noDefaultValue() + .withDescription( + "option 'sink.multiple.partition-pattern' used either when the partitioner is raw-hash, or when passing in designated partition field names for custom field partitions"); + + public static final ConfigOption<String> SINK_FIXED_IDENTIFIER = + ConfigOptions.key("sink.fixed.identifier") + .stringType() + .defaultValue("-1"); + + // -------------------------------------------------------------------------------------------- + // Sink specific options + // -------------------------------------------------------------------------------------------- + public static final ConfigOption<Boolean> KAFKA_IGNORE_ALL_CHANGELOG = + ConfigOptions.key("sink.ignore.changelog") + .booleanType() + .defaultValue(false) + .withDescription("Regard upsert delete as insert kind."); + +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java new file mode 100644 index 0000000000..06cf40ea49 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.regex.Pattern; +import java.util.stream.IntStream; + +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; + +/** Utilities for {@link KafkaConnectorOptions}. */ +@Internal +class KafkaConnectorOptionsUtil { + + private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT = + ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue(); + + // -------------------------------------------------------------------------------------------- + // Option enumerations + // -------------------------------------------------------------------------------------------- + + // Prefix for Kafka specific properties. + public static final String PROPERTIES_PREFIX = "properties."; + + // Sink partitioner. + public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default"; + public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed"; + public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin"; + + // Other keywords. + private static final String PARTITION = "partition"; + private static final String OFFSET = "offset"; + protected static final String AVRO_CONFLUENT = "avro-confluent"; + protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent"; + private static final List<String> SCHEMA_REGISTRY_FORMATS = + Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT); + + // -------------------------------------------------------------------------------------------- + // Validation + // -------------------------------------------------------------------------------------------- + + public static void validateTableSourceOptions(ReadableConfig tableOptions) { + validateSourceTopic(tableOptions); + validateScanStartupMode(tableOptions); + } + + public static void validateTableSinkOptions(ReadableConfig tableOptions) { + validateSinkTopic(tableOptions); + validateSinkPartitioner(tableOptions); + } + + public static void validateSourceTopic(ReadableConfig tableOptions) { + Optional<List<String>> topic = tableOptions.getOptional(TOPIC); + Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN); + + if (topic.isPresent() && pattern.isPresent()) { + throw new ValidationException( + "Option 'topic' and 'topic-pattern' shouldn't be set together."); + } + + if (!topic.isPresent() && !pattern.isPresent()) { + throw new ValidationException("Either 'topic' or 'topic-pattern' must be set."); + } + } + + public static void validateSinkTopic(ReadableConfig tableOptions) { + String errorMessageTemp = + "Flink Kafka sink currently only supports single topic, but got %s: %s."; + if (!isSingleTopic(tableOptions)) { + if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) { + throw new ValidationException( + String.format( + errorMessageTemp, + "'topic-pattern'", + tableOptions.get(TOPIC_PATTERN))); + } else { + throw new ValidationException( + String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC))); + } + } + } + + private static void validateScanStartupMode(ReadableConfig tableOptions) { + tableOptions + .getOptional(SCAN_STARTUP_MODE) + .ifPresent( + mode -> { + switch (mode) { + case TIMESTAMP: + if (!tableOptions + .getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS) + .isPresent()) { + throw new ValidationException( + String.format( + "'%s' is required in '%s' startup mode" + + " but missing.", + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), + KafkaConnectorOptions.ScanStartupMode.TIMESTAMP)); + } + + break; + case SPECIFIC_OFFSETS: + if (!tableOptions + .getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS) + .isPresent()) { + throw new ValidationException( + String.format( + "'%s' is required in '%s' startup mode" + + " but missing.", + SCAN_STARTUP_SPECIFIC_OFFSETS.key(), + KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS)); + } + if (!isSingleTopic(tableOptions)) { + throw new ValidationException( + "Currently Kafka source only supports specific offset for single topic."); + } + String specificOffsets = + tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS); + parseSpecificOffsets( + specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); + + break; + } + }); + } + + private static void validateSinkPartitioner(ReadableConfig tableOptions) { + tableOptions + .getOptional(SINK_PARTITIONER) + .ifPresent( + partitioner -> { + if (partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) + && tableOptions.getOptional(KEY_FIELDS).isPresent()) { + throw new ValidationException( + "Currently 'round-robin' partitioner only works when option 'key.fields' is not specified."); + } else if (partitioner.isEmpty()) { + throw new ValidationException( + String.format( + "Option '%s' should be a non-empty string.", + SINK_PARTITIONER.key())); + } + }); + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + public static List<String> getSourceTopics(ReadableConfig tableOptions) { + return tableOptions.getOptional(TOPIC).orElse(null); + } + + public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) { + return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null); + } + + private static boolean isSingleTopic(ReadableConfig tableOptions) { + // Option 'topic-pattern' is regarded as multi-topics. + return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false); + } + + /** + * Parses SpecificOffsets String to Map. + * + * <p>SpecificOffsets String format was given as following: + * + * <pre> + * scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300 + * </pre> + * + * @return SpecificOffsets with Map format, key is partition, and value is offset + */ + public static Map<Integer, Long> parseSpecificOffsets( + String specificOffsetsStr, String optionKey) { + final Map<Integer, Long> offsetMap = new HashMap<>(); + final String[] pairs = specificOffsetsStr.split(";"); + final String validationExceptionMessage = + String.format( + "Invalid properties '%s' should follow the format " + + "'partition:0,offset:42;partition:1,offset:300', but is '%s'.", + optionKey, specificOffsetsStr); + + if (pairs.length == 0) { + throw new ValidationException(validationExceptionMessage); + } + + for (String pair : pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException(validationExceptionMessage); + } + + final String[] kv = pair.split(","); + if (kv.length != 2 + || !kv[0].startsWith(PARTITION + ':') + || !kv[1].startsWith(OFFSET + ':')) { + throw new ValidationException(validationExceptionMessage); + } + + String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); + String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); + try { + final Integer partition = Integer.valueOf(partitionValue); + final Long offset = Long.valueOf(offsetValue); + offsetMap.put(partition, offset); + } catch (NumberFormatException e) { + throw new ValidationException(validationExceptionMessage, e); + } + } + return offsetMap; + } + + public static StartupOptions getStartupOptions(ReadableConfig tableOptions) { + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + final StartupMode startupMode = + tableOptions + .getOptional(SCAN_STARTUP_MODE) + .map(KafkaConnectorOptionsUtil::fromOption) + .orElse(StartupMode.GROUP_OFFSETS); + if (startupMode == StartupMode.SPECIFIC_OFFSETS) { + // It will be refactored after support specific offset for multiple topics in + // FLINK-18602. We have already checked tableOptions.get(TOPIC) contains one topic in + // validateScanStartupMode(). + buildSpecificOffsets(tableOptions, tableOptions.get(TOPIC).get(0), specificOffsets); + } + + final StartupOptions options = new StartupOptions(); + options.startupMode = startupMode; + options.specificOffsets = specificOffsets; + if (startupMode == StartupMode.TIMESTAMP) { + options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS); + } + return options; + } + + private static void buildSpecificOffsets( + ReadableConfig tableOptions, + String topic, + Map<KafkaTopicPartition, Long> specificOffsets) { + String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS); + final Map<Integer, Long> offsetMap = + parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); + offsetMap.forEach( + (partition, offset) -> { + final KafkaTopicPartition topicPartition = + new KafkaTopicPartition(topic, partition); + specificOffsets.put(topicPartition, offset); + }); + } + + /** + * Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link + * KafkaConnectorOptions.ScanStartupMode}. + */ + private static StartupMode fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) { + switch (scanStartupMode) { + case EARLIEST_OFFSET: + return StartupMode.EARLIEST; + case LATEST_OFFSET: + return StartupMode.LATEST; + case GROUP_OFFSETS: + return StartupMode.GROUP_OFFSETS; + case SPECIFIC_OFFSETS: + return StartupMode.SPECIFIC_OFFSETS; + case TIMESTAMP: + return StartupMode.TIMESTAMP; + + default: + throw new TableException( + "Unsupported startup mode. Validator should have checked that."); + } + } + + static void validateDeliveryGuarantee(ReadableConfig tableOptions) { + if (tableOptions.get(DELIVERY_GUARANTEE) == DeliveryGuarantee.EXACTLY_ONCE + && !tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) { + throw new ValidationException( + TRANSACTIONAL_ID_PREFIX.key() + + " must be specified when using DeliveryGuarantee.EXACTLY_ONCE."); + } + } + + /** + * Creates an array of indices that determine which physical fields of the table schema to + * include in the key format and the order that those fields have in the key format. + * + * <p>See {@link KafkaConnectorOptions#KEY_FORMAT}, {@link KafkaConnectorOptions#KEY_FIELDS}, + * and {@link KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information. + */ + public static int[] createKeyFormatProjection( + ReadableConfig options, DataType physicalDataType) { + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + physicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); + final Optional<String> optionalKeyFormat = options.getOptional(KEY_FORMAT); + final Optional<List<String>> optionalKeyFields = options.getOptional(KEY_FIELDS); + + if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) { + throw new ValidationException( + String.format( + "The option '%s' can only be declared if a key format is defined using '%s'.", + KEY_FIELDS.key(), KEY_FORMAT.key())); + } else if (optionalKeyFormat.isPresent() + && (!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) { + throw new ValidationException( + String.format( + "A key format '%s' requires the declaration of one or more of key fields using '%s'.", + KEY_FORMAT.key(), KEY_FIELDS.key())); + } + + if (!optionalKeyFormat.isPresent()) { + return new int[0]; + } + + final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse(""); + + final List<String> keyFields = optionalKeyFields.get(); + final List<String> physicalFields = LogicalTypeChecks.getFieldNames(physicalType); + return keyFields.stream() + .mapToInt( + keyField -> { + final int pos = physicalFields.indexOf(keyField); + // check that field name exists + if (pos < 0) { + throw new ValidationException( + String.format( + "Could not find the field '%s' in the table schema for usage in the key format. " + + "A key field must be a regular, physical column. " + + "The following columns can be selected in the '%s' option:\n" + + "%s", + keyField, KEY_FIELDS.key(), physicalFields)); + } + // check that field name is prefixed correctly + if (!keyField.startsWith(keyPrefix)) { + throw new ValidationException( + String.format( + "All fields in '%s' must be prefixed with '%s' when option '%s' " + + "is set but field '%s' is not prefixed.", + KEY_FIELDS.key(), + keyPrefix, + KEY_FIELDS_PREFIX.key(), + keyField)); + } + return pos; + }) + .toArray(); + } + + /** + * Creates an array of indices that determine which physical fields of the table schema to + * include in the value format. + * + * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link + * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {@link + * KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information. + */ + public static int[] createValueFormatProjection( + ReadableConfig options, DataType physicalDataType) { + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + physicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); + final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); + final IntStream physicalFields = IntStream.range(0, physicalFieldCount); + + final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse(""); + + final KafkaConnectorOptions.ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE); + if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) { + if (keyPrefix.length() > 0) { + throw new ValidationException( + String.format( + "A key prefix is not allowed when option '%s' is set to '%s'. " + + "Set it to '%s' instead to avoid field overlaps.", + VALUE_FIELDS_INCLUDE.key(), + KafkaConnectorOptions.ValueFieldsStrategy.ALL, + KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY)); + } + return physicalFields.toArray(); + } else if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) { + final int[] keyProjection = createKeyFormatProjection(options, physicalDataType); + return physicalFields + .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos)) + .toArray(); + } + throw new TableException("Unknown value fields strategy:" + strategy); + } + + /** + * Returns a new table context with a default schema registry subject value in the options if + * the format is a schema registry format (e.g. 'avro-confluent') and the subject is not + * defined. + */ + public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject( + DynamicTableFactory.Context context) { + Map<String, String> tableOptions = context.getCatalogTable().getOptions(); + Map<String, String> newOptions = autoCompleteSchemaRegistrySubject(tableOptions); + if (newOptions.size() > tableOptions.size()) { + // build a new context + return new FactoryUtil.DefaultDynamicTableContext( + context.getObjectIdentifier(), + context.getCatalogTable().copy(newOptions), + context.getEnrichmentOptions(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } else { + return context; + } + } + + private static Map<String, String> autoCompleteSchemaRegistrySubject( + Map<String, String> options) { + Configuration configuration = Configuration.fromMap(options); + // the subject autoComplete should only be used in sink, check the topic first + validateSinkTopic(configuration); + final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT); + final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT); + final Optional<String> format = configuration.getOptional(FORMAT); + final String topic = configuration.get(TOPIC).get(0); + + if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) { + autoCompleteSubject(configuration, format.get(), topic + "-value"); + } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) { + autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value"); + } + + if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) { + autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key"); + } + return configuration.toMap(); + } + + private static void autoCompleteSubject( + Configuration configuration, String format, String subject) { + ConfigOption<String> subjectOption = + ConfigOptions.key(format + "." + SCHEMA_REGISTRY_SUBJECT.key()) + .stringType() + .noDefaultValue(); + if (!configuration.getOptional(subjectOption).isPresent()) { + configuration.setString(subjectOption, subject); + } + } + + /** + * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class + * name. + */ + public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner( + ReadableConfig tableOptions, ClassLoader classLoader) { + return tableOptions + .getOptional(SINK_PARTITIONER) + .flatMap( + (String partitioner) -> { + switch (partitioner) { + case SINK_PARTITIONER_VALUE_FIXED: + return Optional.of(new FlinkFixedPartitioner<>()); + case SINK_PARTITIONER_VALUE_DEFAULT: + case SINK_PARTITIONER_VALUE_ROUND_ROBIN: + return Optional.empty(); + // Default fallback to full class name of the partitioner. + default: + return Optional.of( + initializePartitioner(partitioner, classLoader)); + } + }); + } + + /** Returns a class value with the given class name. */ + private static <T> FlinkKafkaPartitioner<T> initializePartitioner( + String name, ClassLoader classLoader) { + try { + Class<?> clazz = Class.forName(name, true, classLoader); + if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) { + throw new ValidationException( + String.format( + "Sink partitioner class '%s' should extend from the required class %s", + name, FlinkKafkaPartitioner.class.getName())); + } + @SuppressWarnings("unchecked") + final FlinkKafkaPartitioner<T> kafkaPartitioner = + InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader); + + return kafkaPartitioner; + } catch (ClassNotFoundException | FlinkException e) { + throw new ValidationException( + String.format("Could not find and instantiate partitioner class '%s'", name), + e); + } + } + + public static Properties getKafkaProperties(Map<String, String> tableOptions) { + final Properties kafkaProperties = new Properties(); + + if (hasKafkaClientProperties(tableOptions)) { + tableOptions.keySet().stream() + .filter(key -> key.startsWith(PROPERTIES_PREFIX)) + .forEach( + key -> { + final String value = tableOptions.get(key); + final String subKey = key.substring((PROPERTIES_PREFIX).length()); + kafkaProperties.put(subKey, value); + }); + } + return kafkaProperties; + } + + /** + * Decides if the table options contains Kafka client properties that start with prefix + * 'properties'. + */ + private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) { + return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); + } + + // -------------------------------------------------------------------------------------------- + // Inner classes + // -------------------------------------------------------------------------------------------- + + /** Kafka startup options. */ + public static class StartupOptions { + + public StartupMode startupMode; + public Map<KafkaTopicPartition, Long> specificOffsets; + public long startupTimestampMillis; + } + + private KafkaConnectorOptionsUtil() { + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java new file mode 100644 index 0000000000..c87735ffc8 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.table; + +import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.kafka.KafkaOptions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink; +import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource; +import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.StartupOptions; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions; + +/** + * Factory for creating configured instances of {@link KafkaDynamicSource} and {@link + * KafkaDynamicSink}. + */ +@Internal +public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaDynamicTableFactory.class); + + private static final ConfigOption<String> SINK_SEMANTIC = + ConfigOptions.key("sink.semantic") + .stringType() + .noDefaultValue() + .withDescription("Optional semantic when committing."); + + public static final String IDENTIFIER = "kafka-inlong"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PROPS_BOOTSTRAP_SERVERS); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FORMAT); + options.add(KEY_FORMAT); + options.add(KEY_FIELDS); + options.add(KEY_FIELDS_PREFIX); + options.add(VALUE_FORMAT); + options.add(VALUE_FIELDS_INCLUDE); + options.add(TOPIC); + options.add(TOPIC_PATTERN); + options.add(PROPS_GROUP_ID); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); + options.add(SCAN_TOPIC_PARTITION_DISCOVERY); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SINK_PARTITIONER); + options.add(SINK_PARALLELISM); + options.add(DELIVERY_GUARANTEE); + options.add(TRANSACTIONAL_ID_PREFIX); + options.add(SINK_SEMANTIC); + options.add(Constants.INLONG_METRIC); + options.add(Constants.INLONG_AUDIT); + options.add(Constants.AUDIT_KEYS); + options.add(Constants.SINK_MULTIPLE_FORMAT); + options.add(Constants.PATTERN_PARTITION_MAP); + options.add(Constants.DATASOURCE_PARTITION_MAP); + options.add(Constants.SINK_SCHEMA_CHANGE_ENABLE); + options.add(Constants.SINK_SCHEMA_CHANGE_POLICIES); + options.add(KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG); + options.add(KafkaOptions.SINK_MULTIPLE_PARTITION_PATTERN); + options.add(KafkaOptions.SINK_FIXED_IDENTIFIER); + return options; + } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + PROPS_BOOTSTRAP_SERVERS, + PROPS_GROUP_ID, + TOPIC, + TOPIC_PATTERN, + SCAN_STARTUP_MODE, + SCAN_STARTUP_SPECIFIC_OFFSETS, + SCAN_TOPIC_PARTITION_DISCOVERY, + SCAN_STARTUP_TIMESTAMP_MILLIS, + SINK_PARTITIONER, + SINK_PARALLELISM, + TRANSACTIONAL_ID_PREFIX) + .collect(Collectors.toSet()); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = + getKeyDecodingFormat(helper); + + final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = + getValueDecodingFormat(helper); + + final ReadableConfig tableOptions = helper.getOptions(); + + final String valueFormatPrefix = tableOptions.getOptional(FORMAT).orElse(tableOptions.get(VALUE_FORMAT)); + LOG.info("valueFormatPrefix is {}", valueFormatPrefix); + helper.validateExcept(PROPERTIES_PREFIX, Constants.DIRTY_PREFIX, valueFormatPrefix); + + validateTableSourceOptions(tableOptions); + + validatePKConstraints( + context.getObjectIdentifier(), + context.getPrimaryKeyIndexes(), + context.getCatalogTable().getOptions(), + valueDecodingFormat); + + final StartupOptions startupOptions = getStartupOptions(tableOptions); + + final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + + // add topic-partition discovery + final Optional<Long> partitionDiscoveryInterval = + tableOptions.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis); + properties.setProperty( + KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), + partitionDiscoveryInterval.orElse(-1L).toString()); + + final DataType physicalDataType = context.getPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + return createKafkaTableSource( + physicalDataType, + keyDecodingFormat.orElse(null), + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + getSourceTopics(tableOptions), + getSourceTopicPattern(tableOptions), + properties, + startupOptions.startupMode, + startupOptions.specificOffsets, + startupOptions.startupTimestampMillis, + context.getObjectIdentifier().asSummaryString()); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper( + this, autoCompleteSchemaRegistrySubject(context)); + + final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat = + getKeyEncodingFormat(helper); + + final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = + getValueEncodingFormat(helper); + + helper.validateExcept(KafkaConnectorOptionsUtil.PROPERTIES_PREFIX); + + final ReadableConfig tableOptions = helper.getOptions(); + + final DeliveryGuarantee deliveryGuarantee = validateDeprecatedSemantic(tableOptions); + validateTableSinkOptions(tableOptions); + + KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions); + + validatePKConstraints( + context.getObjectIdentifier(), + context.getPrimaryKeyIndexes(), + context.getCatalogTable().getOptions(), + valueEncodingFormat); + + final DataType physicalDataType = context.getPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); + + return createKafkaTableSink( + physicalDataType, + keyEncodingFormat.orElse(null), + valueEncodingFormat, + keyProjection, + valueProjection, + keyPrefix, + tableOptions.get(TOPIC).get(0), + getKafkaProperties(context.getCatalogTable().getOptions()), + getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), + deliveryGuarantee, + parallelism, + tableOptions.get(TRANSACTIONAL_ID_PREFIX)); + } + + private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat( + TableFactoryHelper helper) { + final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = + helper.discoverOptionalDecodingFormat( + DeserializationFormatFactory.class, KEY_FORMAT); + keyDecodingFormat.ifPresent( + format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyDecodingFormat; + } + + private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat( + TableFactoryHelper helper) { + final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat = + helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); + keyEncodingFormat.ifPresent( + format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyEncodingFormat; + } + + private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat( + TableFactoryHelper helper) { + return helper.discoverOptionalDecodingFormat( + DeserializationFormatFactory.class, FORMAT) + .orElseGet( + () -> helper.discoverDecodingFormat( + DeserializationFormatFactory.class, VALUE_FORMAT)); + } + + private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat( + TableFactoryHelper helper) { + return helper.discoverOptionalEncodingFormat( + SerializationFormatFactory.class, FORMAT) + .orElseGet( + () -> helper.discoverEncodingFormat( + SerializationFormatFactory.class, VALUE_FORMAT)); + } + + private static void validatePKConstraints( + ObjectIdentifier tableName, + int[] primaryKeyIndexes, + Map<String, String> options, + Format format) { + if (primaryKeyIndexes.length > 0 + && format.getChangelogMode().containsOnly(RowKind.INSERT)) { + Configuration configuration = Configuration.fromMap(options); + String formatName = + configuration + .getOptional(FORMAT) + .orElse(configuration.get(VALUE_FORMAT)); + throw new ValidationException( + String.format( + "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + + " on the table, because it can't guarantee the semantic of primary key.", + tableName.asSummaryString(), formatName)); + } + } + + private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig tableOptions) { + if (tableOptions.getOptional(SINK_SEMANTIC).isPresent()) { + LOG.warn( + "{} is deprecated and will be removed. Please use {} instead.", + SINK_SEMANTIC.key(), + DELIVERY_GUARANTEE.key()); + return DeliveryGuarantee.valueOf( + tableOptions.get(SINK_SEMANTIC).toUpperCase().replace("-", "_")); + } + return tableOptions.get(DELIVERY_GUARANTEE); + } + + protected KafkaDynamicSource createKafkaTableSource( + DataType physicalDataType, + @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, + DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + @Nullable List<String> topics, + @Nullable Pattern topicPattern, + Properties properties, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets, + long startupTimestampMillis, + String tableIdentifier) { + return new KafkaDynamicSource( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topics, + topicPattern, + properties, + startupMode, + specificStartupOffsets, + startupTimestampMillis, + false, + tableIdentifier); + } + + protected KafkaDynamicSink createKafkaTableSink( + DataType physicalDataType, + @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat, + EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + String topic, + Properties properties, + FlinkKafkaPartitioner<RowData> partitioner, + DeliveryGuarantee deliveryGuarantee, + Integer parallelism, + @Nullable String transactionalIdPrefix) { + return new KafkaDynamicSink( + physicalDataType, + physicalDataType, + keyEncodingFormat, + valueEncodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topic, + properties, + partitioner, + deliveryGuarantee, + false, + SinkBufferFlushMode.DISABLED, + parallelism, + transactionalIdPrefix); + } + +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java new file mode 100644 index 0000000000..2f61265c32 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.table; + +import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.kafka.KafkaOptions; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; +import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink; +import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource; +import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern; +import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics; + +/** Upsert-Kafka factory. */ +public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "upsert-kafka-inlong"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PROPS_BOOTSTRAP_SERVERS); + options.add(TOPIC); + options.add(KEY_FORMAT); + options.add(VALUE_FORMAT); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(KEY_FIELDS_PREFIX); + options.add(VALUE_FIELDS_INCLUDE); + options.add(SINK_PARALLELISM); + options.add(SINK_BUFFER_FLUSH_INTERVAL); + options.add(SINK_BUFFER_FLUSH_MAX_ROWS); + options.add(Constants.INLONG_METRIC); + options.add(KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG); + options.add(KafkaOptions.SINK_MULTIPLE_PARTITION_PATTERN); + options.add(KafkaOptions.SINK_FIXED_IDENTIFIER); + options.add(KafkaConnectorOptions.SINK_PARTITIONER); + return options; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + ReadableConfig tableOptions = helper.getOptions(); + DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = + helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT); + DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = + helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT); + + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX, Constants.DIRTY_PREFIX); + validateSource( + tableOptions, + keyDecodingFormat, + valueDecodingFormat, + context.getPrimaryKeyIndexes()); + + Tuple2<int[], int[]> keyValueProjections = + createKeyValueProjections(context.getCatalogTable()); + String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + // always use earliest to keep data integrity + StartupMode earliest = StartupMode.EARLIEST; + + return new KafkaDynamicSource( + context.getPhysicalRowDataType(), + keyDecodingFormat, + new DecodingFormatWrapper(valueDecodingFormat), + keyValueProjections.f0, + keyValueProjections.f1, + keyPrefix, + getSourceTopics(tableOptions), + getSourceTopicPattern(tableOptions), + properties, + earliest, + Collections.emptyMap(), + 0, + true, + context.getObjectIdentifier().asSummaryString()); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper( + this, autoCompleteSchemaRegistrySubject(context)); + + final ReadableConfig tableOptions = helper.getOptions(); + + EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = + helper.discoverEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); + EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = + helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT); + + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX); + validateSink( + tableOptions, + keyEncodingFormat, + valueEncodingFormat, + context.getPrimaryKeyIndexes()); + + Tuple2<int[], int[]> keyValueProjections = + createKeyValueProjections(context.getCatalogTable()); + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + + Integer parallelism = tableOptions.get(SINK_PARALLELISM); + + int batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS); + Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL); + SinkBufferFlushMode flushMode = + new SinkBufferFlushMode(batchSize, batchInterval.toMillis()); + + // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. + // it will use hash partition if key is set else in round-robin behaviour. + return new KafkaDynamicSink( + context.getPhysicalRowDataType(), + context.getPhysicalRowDataType(), + keyEncodingFormat, + new EncodingFormatWrapper(valueEncodingFormat), + keyValueProjections.f0, + keyValueProjections.f1, + keyPrefix, + tableOptions.get(TOPIC).get(0), + properties, + null, + DeliveryGuarantee.AT_LEAST_ONCE, + true, + flushMode, + parallelism, + tableOptions.get(TRANSACTIONAL_ID_PREFIX)); + } + + private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) { + ResolvedSchema schema = catalogTable.getResolvedSchema(); + // primary key should validated earlier + List<String> keyFields = schema.getPrimaryKey().get().getColumns(); + DataType physicalDataType = schema.toPhysicalRowDataType(); + + Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions()); + // upsert-kafka will set key.fields to primary key fields by default + tableOptions.set(KEY_FIELDS, keyFields); + + int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + return Tuple2.of(keyProjection, valueProjection); + } + + // -------------------------------------------------------------------------------------------- + // Validation + // -------------------------------------------------------------------------------------------- + + private static void validateSource( + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + int[] primaryKeyIndexes) { + validateTopic(tableOptions); + validateFormat(keyFormat, valueFormat, tableOptions); + validatePKConstraints(primaryKeyIndexes); + } + + private static void validateSink( + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + int[] primaryKeyIndexes) { + validateTopic(tableOptions); + validateFormat(keyFormat, valueFormat, tableOptions); + validatePKConstraints(primaryKeyIndexes); + validateSinkBufferFlush(tableOptions); + } + + private static void validateTopic(ReadableConfig tableOptions) { + List<String> topic = tableOptions.get(TOPIC); + if (topic.size() > 1) { + throw new ValidationException( + "The 'upsert-kafka' connector doesn't support topic list now. " + + "Please use single topic as the value of the parameter 'topic'."); + } + } + + private static void validateFormat( + Format keyFormat, Format valueFormat, ReadableConfig tableOptions) { + if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) { + String identifier = tableOptions.get(KEY_FORMAT); + throw new ValidationException( + String.format( + "'upsert-kafka' connector doesn't support '%s' as key format, " + + "because '%s' is not in insert-only mode.", + identifier, identifier)); + } + if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) { + String identifier = tableOptions.get(VALUE_FORMAT); + throw new ValidationException( + String.format( + "'upsert-kafka' connector doesn't support '%s' as value format, " + + "because '%s' is not in insert-only mode.", + identifier, identifier)); + } + } + + private static void validatePKConstraints(int[] schema) { + if (schema.length == 0) { + throw new ValidationException( + "'upsert-kafka' tables require to define a PRIMARY KEY constraint. " + + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " + + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys."); + } + } + + private static void validateSinkBufferFlush(ReadableConfig tableOptions) { + int flushMaxRows = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS); + long flushIntervalMs = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis(); + if (flushMaxRows > 0 && flushIntervalMs > 0) { + // flush is enabled + return; + } + if (flushMaxRows <= 0 && flushIntervalMs <= 0) { + // flush is disabled + return; + } + // one of them is set which is not allowed + throw new ValidationException( + String.format( + "'%s' and '%s' must be set to be greater than zero together to enable sink buffer flushing.", + SINK_BUFFER_FLUSH_MAX_ROWS.key(), SINK_BUFFER_FLUSH_INTERVAL.key())); + } + + // -------------------------------------------------------------------------------------------- + // Format wrapper + // -------------------------------------------------------------------------------------------- + + /** + * It is used to wrap the decoding format and expose the desired changelog mode. It's only works + * for insert-only format. + */ + protected static class DecodingFormatWrapper + implements + DecodingFormat<DeserializationSchema<RowData>> { + + private final DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat; + + private static final ChangelogMode SOURCE_CHANGELOG_MODE = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + public DecodingFormatWrapper( + DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat) { + this.innerDecodingFormat = innerDecodingFormat; + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType producedDataType) { + return innerDecodingFormat.createRuntimeDecoder(context, producedDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return SOURCE_CHANGELOG_MODE; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + DecodingFormatWrapper that = (DecodingFormatWrapper) obj; + return Objects.equals(innerDecodingFormat, that.innerDecodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash(innerDecodingFormat); + } + } + + /** + * It is used to wrap the encoding format and expose the desired changelog mode. It's only works + * for insert-only format. + */ + protected static class EncodingFormatWrapper + implements + EncodingFormat<SerializationSchema<RowData>> { + + private final EncodingFormat<SerializationSchema<RowData>> innerEncodingFormat; + + public static final ChangelogMode SINK_CHANGELOG_MODE = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + public EncodingFormatWrapper( + EncodingFormat<SerializationSchema<RowData>> innerEncodingFormat) { + this.innerEncodingFormat = innerEncodingFormat; + } + + @Override + public SerializationSchema<RowData> createRuntimeEncoder( + DynamicTableSink.Context context, DataType consumedDataType) { + return innerEncodingFormat.createRuntimeEncoder(context, consumedDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return SINK_CHANGELOG_MODE; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EncodingFormatWrapper that = (EncodingFormatWrapper) obj; + return Objects.equals(innerEncodingFormat, that.innerEncodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash(innerEncodingFormat); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..7eeb7cd7a8 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory +org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml index 38c475a573..dd789c6bd5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml @@ -42,6 +42,7 @@ <module>tubemq</module> <module>hbase</module> <module>hudi</module> + <module>kafka</module> </modules> <properties> diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 3ac170a2d3..ec46cd86db 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -815,6 +815,12 @@ Source : flink-connector-hbase-2.2 1.15.4 (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/LICENSE +1.3.20 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java + Source : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that the software have been modified.) + License : https://github.com/apache/flink/blob/master/LICENSE + ======================================================================= Apache InLong Subcomponents: diff --git a/pom.xml b/pom.xml index 65086684c4..3d03a436ad 100644 --- a/pom.xml +++ b/pom.xml @@ -1192,6 +1192,12 @@ <version>${testcontainers.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>jdbc</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>mysql</artifactId> @@ -1200,7 +1206,19 @@ </dependency> <dependency> <groupId>org.testcontainers</groupId> - <artifactId>jdbc</artifactId> + <artifactId>postgresql</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mongodb</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>clickhouse</artifactId> <version>${testcontainers.version}</version> <scope>test</scope> </dependency>