[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689587#comment-16689587 ]
ASF GitHub Bot commented on FLINK-10843: ---------------------------------------- asfgit closed pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 5f1112706ce..effd913707e 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -40,14 +40,15 @@ The following table list all available connectors and formats. Their mutual comp ### Connectors -| Name | Version | Maven dependency | SQL Client JAR | -| :---------------- | :------------ | :--------------------------- | :----------------------| -| Filesystem | | Built-in | Built-in | -| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | -| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Name | Version | Maven dependency | SQL Client JAR | +| :---------------- | :------------------ | :--------------------------- | :----------------------| +| Filesystem | | Built-in | Built-in | +| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | +| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | ### Formats @@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight java %} .connect( new Kafka() - .version("0.11") // required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" + .version("0.11") // required: valid connector versions are + // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") // required: topic name from which the table is read // optional: connector specific properties @@ -549,7 +551,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight yaml %} connector: type: kafka - version: "0.11" # required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" + version: "0.11" # required: valid connector versions are + # "0.8", "0.9", "0.10", "0.11", and "universal" topic: ... # required: topic name from which the table is read properties: # optional: connector specific properties @@ -583,7 +586,9 @@ connector: **Consistency guarantees:** By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). -**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. +**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. + +**Kafka 0.11+ Versioning:** Since Flink 1.7, the Kafka connector definition should be independent of a hard-coded Kafka version. Use the connector version `universal` as a wildcard for Flink's Kafka connector that is compatible with all Kafka versions starting from 0.11. Make sure to add the version-specific Kafka dependency. In addition, a corresponding format needs to be specified for reading and writing rows from and to Kafka. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java index 86d7ef6d35b..0dbbbeb348a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -21,9 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -40,7 +38,7 @@ public static final String CONNECTOR_VERSION_VALUE_09 = "0.9"; public static final String CONNECTOR_VERSION_VALUE_010 = "0.10"; public static final String CONNECTOR_VERSION_VALUE_011 = "0.11"; - public static final String CONNECTOR_VERSION_VALUE_20 = "2.0"; + public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = "universal"; public static final String CONNECTOR_TOPIC = "connector.topic"; public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode"; public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; @@ -64,7 +62,7 @@ public void validate(DescriptorProperties properties) { super.validate(properties); properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA, false); - validateVersion(properties); + properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); validateStartupMode(properties); @@ -73,17 +71,6 @@ public void validate(DescriptorProperties properties) { validateSinkPartitioner(properties); } - private void validateVersion(DescriptorProperties properties) { - final List<String> versions = Arrays.asList( - CONNECTOR_VERSION_VALUE_08, - CONNECTOR_VERSION_VALUE_09, - CONNECTOR_VERSION_VALUE_010, - CONNECTOR_VERSION_VALUE_011, - CONNECTOR_VERSION_VALUE_20); - properties.validateEnumValues(CONNECTOR_VERSION, false, versions); - properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); - } - private void validateStartupMode(DescriptorProperties properties) { final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>(); specificOffsetValidators.put( diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java index b0dfc54e671..2b498674a89 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java @@ -39,7 +39,7 @@ @Override protected String kafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_20; + return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL; } @Override diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java index 5043880b21e..4d843bcd82e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java @@ -40,7 +40,7 @@ @Override protected String getKafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_20; + return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL; } @Override diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh index 3e95b405b10..468f0587db7 100644 --- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh +++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh @@ -21,6 +21,7 @@ KAFKA_CONNECTOR_VERSION="$1" KAFKA_VERSION="$2" CONFLUENT_VERSION="$3" CONFLUENT_MAJOR_VERSION="$4" +KAFKA_SQL_VERSION="$5" source "$(dirname "$0")"/kafka-common.sh $2 $3 $4 @@ -64,7 +65,7 @@ function get_kafka_json_source_schema { type: ROW<type VARCHAR, message VARCHAR> connector: type: kafka - version: "$KAFKA_CONNECTOR_VERSION" + version: "$KAFKA_SQL_VERSION" topic: $topicName startup-mode: earliest-offset properties: diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 74192b40dfb..5dd68838ba7 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -23,9 +23,15 @@ KAFKA_CONNECTOR_VERSION="2.0" KAFKA_VERSION="2.0.0" CONFLUENT_VERSION="5.0.0" CONFLUENT_MAJOR_VERSION="5.0" +KAFKA_SQL_VERSION="universal" source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION +source "$(dirname "$0")"/kafka_sql_common.sh \ + $KAFKA_CONNECTOR_VERSION \ + $KAFKA_VERSION \ + $CONFLUENT_VERSION \ + $CONFLUENT_MAJOR_VERSION \ + $KAFKA_SQL_VERSION source "$(dirname "$0")"/elasticsearch-common.sh SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh index a05dc050a60..94e89a2b1e8 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" "universal" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh index c710abc72bc..66bef6623a1 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" "0.10" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh index 149c86f570e..08ed59b419c 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh @@ -22,9 +22,15 @@ KAFKA_VERSION="$2" CONFLUENT_VERSION="$3" CONFLUENT_MAJOR_VERSION="$4" KAFKA_SQL_JAR="$5" +KAFKA_SQL_VERSION="$6" source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION +source "$(dirname "$0")"/kafka_sql_common.sh \ + $KAFKA_CONNECTOR_VERSION \ + $KAFKA_VERSION \ + $CONFLUENT_VERSION \ + $CONFLUENT_MAJOR_VERSION \ + $KAFKA_SQL_VERSION ################################################################################ # Prepare connectors @@ -98,7 +104,7 @@ cat >> $SQL_CONF << EOF type: BIGINT connector: type: kafka - version: "$KAFKA_CONNECTOR_VERSION" + version: "$KAFKA_SQL_VERSION" topic: test-avro startup-mode: earliest-offset properties: ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > ----------------------------------------------------------------------- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API & SQL > Affects Versions: 1.7.0 > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)