[ https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687754#comment-16687754 ]
ASF GitHub Bot commented on FLINK-10624: ---------------------------------------- asfgit closed pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink URL: https://github.com/apache/flink/pull/6927 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index b3a48697f6c..17a26a39cd6 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -89,6 +89,14 @@ under the License. <classifier>sql-jar</classifier> <scope>provided</scope> </dependency> + <dependency> + <!-- Used by maven-dependency-plugin --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <scope>provided</scope> + </dependency> <dependency> <!-- Used by maven-dependency-plugin --> <groupId>org.apache.flink</groupId> @@ -106,7 +114,7 @@ under the License. as we neither access nor package the kafka dependencies --> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.11.0.2</version> + <version>2.0.0</version> </dependency> </dependencies> </dependencyManagement> @@ -130,19 +138,19 @@ under the License. </executions> </plugin> - <!-- Copy SQL jars into dedicated "sql-jars" directory. --> + <!-- Copy SQL jars into dedicated "sql-jars-kafka-0.10" directory. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> - <id>copy</id> + <id>copy-for-sql-jars-kafka-0.10</id> <phase>package</phase> <goals> <goal>copy</goal> </goals> <configuration> - <outputDirectory>${project.build.directory}/sql-jars</outputDirectory> + <outputDirectory>${project.build.directory}/sql-jars-kafka-0.10</outputDirectory> <!-- List of currently provided SQL jars. When extending this list please also add a dependency for the respective module. --> @@ -196,6 +204,73 @@ under the License. </execution> </executions> </plugin> + + <!-- Copy SQL jars into dedicated "sql-jars-kafka" directory. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-for-sql-jars-kafka</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/sql-jars-kafka</outputDirectory> + <!-- List of currently provided SQL jars. + When extending this list please also add a dependency + for the respective module. --> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <!-- This SQL JAR is not used for now to avoid dependency conflicts; see FLINK-10107. + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem>--> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <!-- This SQL JAR is not used for now to avoid dependency conflicts; see FLINK-10107. + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem>--> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 7e15870f1d3..181ef6105c6 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -142,6 +142,8 @@ run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh" +run_test "SQL Client end-to-end test for kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh" +run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh" run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh index c9fab304916..879d29993fb 100755 --- a/flink-end-to-end-tests/run-pre-commit-tests.sh +++ b/flink-end-to-end-tests/run-pre-commit-tests.sh @@ -48,6 +48,9 @@ echo "Flink distribution directory: $FLINK_DIR" # those checks are disabled, one should take care that a proper checks are performed in the tests itself that ensure that the test finished # in an expected state. +run_test "SQL Client end-to-end test for kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh" +run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh" +run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh" run_test "State Migration end-to-end test from 1.6" "$END_TO_END_DIR/test-scripts/test_state_migration.sh" run_test "State Evolution end-to-end test" "$END_TO_END_DIR/test-scripts/test_state_evolution.sh" run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index b9d0ecaa0c6..25d8724ce41 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -19,17 +19,23 @@ set -Eeuo pipefail +KAFKA_CONNECTOR_VERSION="2.0" +KAFKA_VERSION="2.0.0" +CONFLUENT_VERSION="5.0.0" +CONFLUENT_MAJOR_VERSION="5.0" +LIB_DIR_NAME="sql-jars-kafka" + source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2 +source "$(dirname "$0")"/kafka-common.sh $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION source "$(dirname "$0")"/elasticsearch-common.sh -SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar -SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars - ################################################################################ # Verify existing SQL jars ################################################################################ +SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar +SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/$LIB_DIR_NAME + EXTRACTED_JAR=$TEST_DATA_DIR/extracted mkdir -p $EXTRACTED_JAR @@ -152,7 +158,7 @@ tables: type: ROW<type VARCHAR, message VARCHAR> connector: type: kafka - version: "0.10" + version: "$KAFKA_CONNECTOR_VERSION" topic: test-json startup-mode: earliest-offset properties: @@ -185,72 +191,6 @@ tables: } } } - - name: AvroBothTable - type: source-sink-table - update-mode: append - schema: - - name: event_timestamp - type: VARCHAR - - name: user - type: VARCHAR - - name: message - type: VARCHAR - - name: duplicate_count - type: BIGINT - connector: - type: kafka - version: "0.10" - topic: test-avro - startup-mode: earliest-offset - properties: - - key: zookeeper.connect - value: localhost:2181 - - key: bootstrap.servers - value: localhost:9092 - format: - type: avro - avro-schema: > - { - "namespace": "org.apache.flink.table.tests", - "type": "record", - "name": "NormalizedEvent", - "fields": [ - {"name": "event_timestamp", "type": "string"}, - {"name": "user", "type": ["string", "null"]}, - {"name": "message", "type": "string"}, - {"name": "duplicate_count", "type": "long"} - ] - } - - name: CsvSinkTable - type: sink-table - update-mode: append - schema: - - name: event_timestamp - type: VARCHAR - - name: user - type: VARCHAR - - name: message - type: VARCHAR - - name: duplicate_count - type: BIGINT - - name: constant - type: VARCHAR - connector: - type: filesystem - path: $RESULT - format: - type: csv - fields: - - name: event_timestamp - type: VARCHAR - - name: user - type: VARCHAR - - name: message - type: VARCHAR - - name: duplicate_count - type: BIGINT - - name: constant - type: VARCHAR - name: ElasticsearchUpsertSinkTable type: sink-table update-mode: upsert @@ -308,62 +248,6 @@ EOF # submit SQL statements -echo "Executing SQL: Kafka JSON -> Kafka Avro" - -SQL_STATEMENT_1=$(cat << EOF -INSERT INTO AvroBothTable - SELECT - CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp, - user, - RegReplace(event.message, ' is ', ' was ') AS message, - COUNT(*) AS duplicate_count - FROM JsonSourceTable - WHERE user IS NOT NULL - GROUP BY - user, - event.message, - TUMBLE(rowtime, INTERVAL '1' HOUR) -EOF -) - -echo "$SQL_STATEMENT_1" - -$FLINK_DIR/bin/sql-client.sh embedded \ - --library $SQL_JARS_DIR \ - --jar $SQL_TOOLBOX_JAR \ - --environment $SQL_CONF \ - --update "$SQL_STATEMENT_1" - -echo "Executing SQL: Kafka Avro -> Filesystem CSV" - -SQL_STATEMENT_2=$(cat << EOF -INSERT INTO CsvSinkTable - SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant - FROM AvroBothTable -EOF -) - -echo "$SQL_STATEMENT_2" - -$FLINK_DIR/bin/sql-client.sh embedded \ - --library $SQL_JARS_DIR \ - --jar $SQL_TOOLBOX_JAR \ - --environment $SQL_CONF \ - --update "$SQL_STATEMENT_2" - -echo "Waiting for CSV results..." -for i in {1..10}; do - if [ -e $RESULT ]; then - CSV_LINE_COUNT=`cat $RESULT | wc -l` - if [ $((CSV_LINE_COUNT)) -eq 4 ]; then - break - fi - fi - sleep 5 -done - -check_result_hash "SQL Client Kafka" $RESULT "0a1bf8bf716069b7269f575f87a802c0" - echo "Executing SQL: Values -> Elasticsearch (upsert)" SQL_STATEMENT_3=$(cat << EOF diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh new file mode 100755 index 00000000000..e0594432be6 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -Eeuo pipefail + +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 sql-jars-kafka diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh new file mode 100755 index 00000000000..c686ffbfb80 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -Eeuo pipefail + +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 sql-jars-kafka-0.10 diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh new file mode 100755 index 00000000000..7156c3e5993 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh @@ -0,0 +1,310 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -Eeuo pipefail + +KAFKA_CONNECTOR_VERSION="$1" +KAFKA_VERSION="$2" +CONFLUENT_VERSION="$3" +CONFLUENT_MAJOR_VERSION="$4" +LIB_DIR_NAME="$5" + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION + +################################################################################ +# Verify existing SQL jars +################################################################################ + +SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar +SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/$LIB_DIR_NAME + +EXTRACTED_JAR=$TEST_DATA_DIR/extracted + +mkdir -p $EXTRACTED_JAR + +for SQL_JAR in $SQL_JARS_DIR/*.jar; do + echo "Checking SQL JAR: $SQL_JAR" + (cd $EXTRACTED_JAR && jar xf $SQL_JAR) + + # check for proper shading + for EXTRACTED_FILE in $(find $EXTRACTED_JAR -type f); do + + if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \ + ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \ + ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \ + ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then + echo "Bad file in JAR: $EXTRACTED_FILE" + exit 1 + fi + done + + # check for proper factory + if [ ! -f $EXTRACTED_JAR/META-INF/services/org.apache.flink.table.factories.TableFactory ]; then + echo "No table factory found in JAR: $SQL_JAR" + exit 1 + fi + + # clean up + rm -r $EXTRACTED_JAR/* +done + +rm -r $EXTRACTED_JAR + +################################################################################ +# Prepare connectors +################################################################################ + +function sql_cleanup() { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + stop_kafka_cluster +} +trap sql_cleanup INT +trap sql_cleanup EXIT + +# prepare Kafka +echo "Preparing Kafka..." + +setup_kafka_dist + +start_kafka_cluster + +create_kafka_topic 1 1 test-json +create_kafka_topic 1 1 test-avro + +# put JSON data into Kafka +echo "Sending messages to Kafka..." + +send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json +# duplicate +send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json +send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' test-json +send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' test-json +send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json +# duplicate +send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json +# filtered in results +send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' test-json +# pending in results +send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json + +################################################################################ +# Run a SQL statement +################################################################################ + +echo "Testing SQL statement..." + +# prepare Flink +echo "Preparing Flink..." + +start_cluster +start_taskmanagers 2 + +# create session environment file +RESULT=$TEST_DATA_DIR/result +SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf + +cat > $SQL_CONF << EOF +tables: + - name: JsonSourceTable + type: source-table + update-mode: append + schema: + - name: rowtime + type: TIMESTAMP + rowtime: + timestamps: + type: from-field + from: timestamp + watermarks: + type: periodic-bounded + delay: 2000 + - name: user + type: VARCHAR + - name: event + type: ROW<type VARCHAR, message VARCHAR> + connector: + type: kafka + version: "$KAFKA_CONNECTOR_VERSION" + topic: test-json + startup-mode: earliest-offset + properties: + - key: zookeeper.connect + value: localhost:2181 + - key: bootstrap.servers + value: localhost:9092 + format: + type: json + json-schema: > + { + "type": "object", + "properties": { + "timestamp": { + "type": "string" + }, + "user": { + "type": ["string", "null"] + }, + "event": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "message": { + "type": "string" + } + } + } + } + } + - name: AvroBothTable + type: source-sink-table + update-mode: append + schema: + - name: event_timestamp + type: VARCHAR + - name: user + type: VARCHAR + - name: message + type: VARCHAR + - name: duplicate_count + type: BIGINT + connector: + type: kafka + version: "$KAFKA_CONNECTOR_VERSION" + topic: test-avro + startup-mode: earliest-offset + properties: + - key: zookeeper.connect + value: localhost:2181 + - key: bootstrap.servers + value: localhost:9092 + format: + type: avro + avro-schema: > + { + "namespace": "org.apache.flink.table.tests", + "type": "record", + "name": "NormalizedEvent", + "fields": [ + {"name": "event_timestamp", "type": "string"}, + {"name": "user", "type": ["string", "null"]}, + {"name": "message", "type": "string"}, + {"name": "duplicate_count", "type": "long"} + ] + } + - name: CsvSinkTable + type: sink-table + update-mode: append + schema: + - name: event_timestamp + type: VARCHAR + - name: user + type: VARCHAR + - name: message + type: VARCHAR + - name: duplicate_count + type: BIGINT + - name: constant + type: VARCHAR + connector: + type: filesystem + path: $RESULT + format: + type: csv + fields: + - name: event_timestamp + type: VARCHAR + - name: user + type: VARCHAR + - name: message + type: VARCHAR + - name: duplicate_count + type: BIGINT + - name: constant + type: VARCHAR + +functions: + - name: RegReplace + from: class + class: org.apache.flink.table.toolbox.StringRegexReplaceFunction +EOF + +# submit SQL statements + +echo "Executing SQL: Kafka JSON -> Kafka Avro" + +SQL_STATEMENT_1=$(cat << EOF +INSERT INTO AvroBothTable + SELECT + CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp, + user, + RegReplace(event.message, ' is ', ' was ') AS message, + COUNT(*) AS duplicate_count + FROM JsonSourceTable + WHERE user IS NOT NULL + GROUP BY + user, + event.message, + TUMBLE(rowtime, INTERVAL '1' HOUR) +EOF +) + +echo "$SQL_STATEMENT_1" + +$FLINK_DIR/bin/sql-client.sh embedded \ + --library $SQL_JARS_DIR \ + --jar $SQL_TOOLBOX_JAR \ + --environment $SQL_CONF \ + --update "$SQL_STATEMENT_1" + +echo "Executing SQL: Kafka Avro -> Filesystem CSV" + +SQL_STATEMENT_2=$(cat << EOF +INSERT INTO CsvSinkTable + SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant + FROM AvroBothTable +EOF +) + +echo "$SQL_STATEMENT_2" + +$FLINK_DIR/bin/sql-client.sh embedded \ + --library $SQL_JARS_DIR \ + --jar $SQL_TOOLBOX_JAR \ + --environment $SQL_CONF \ + --update "$SQL_STATEMENT_2" + +echo "Waiting for CSV results..." +for i in {1..10}; do + if [ -e $RESULT ]; then + CSV_LINE_COUNT=`cat $RESULT | wc -l` + if [ $((CSV_LINE_COUNT)) -eq 4 ]; then + break + fi + fi + sleep 5 +done + +check_result_hash "SQL Client Kafka" $RESULT "0a1bf8bf716069b7269f575f87a802c0" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extend SQL client end-to-end to test new KafkaTableSink > ------------------------------------------------------- > > Key: FLINK-10624 > URL: https://issues.apache.org/jira/browse/FLINK-10624 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Table API & SQL, Tests > Affects Versions: 1.7.0 > Reporter: Till Rohrmann > Assignee: vinoyang > Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9697, we added support for Kafka 2.0. We should also extend the > existing streaming client end-to-end test to also test the new > {{KafkaTableSink}} against Kafka 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)