[ https://issues.apache.org/jira/browse/FLINK-10623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16666978#comment-16666978 ]
ASF GitHub Bot commented on FLINK-10623: ---------------------------------------- dawidwys closed pull request #6926: [FLINK-10623][e2e] Extended sql-client e2e test with MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/6926 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 618aae8da9a..5eb2c93a0f6 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -306,6 +306,7 @@ function check_logs_for_errors { | grep -v "Async Kafka commit failed" \ | grep -v "DisconnectException" \ | grep -v "AskTimeoutException" \ + | grep -v "Error while loading kafka-version.properties" \ | grep -v "WARN akka.remote.transport.netty.NettyTransport" \ | grep -v "WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \ | grep -v "jvm-exit-on-fatal-error" \ diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 6b57dd9a129..ce3d1944877 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -404,3 +404,31 @@ wait_job_terminal_state "$JOB_ID" "FINISHED" # 3 upsert results and 6 append results verify_result_line_number 9 "$ELASTICSEARCH_INDEX" + +echo "Executing SQL: Match recognize -> Elasticsearch" + +read -r -d '' SQL_STATEMENT_5 << EOF +INSERT INTO ElasticsearchAppendSinkTable + SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count + FROM ( + SELECT user, rowtime + FROM JsonSourceTable + WHERE user IS NOT NULL) + MATCH_RECOGNIZE ( + ORDER BY rowtime + MEASURES + user as userName + PATTERN (A) + DEFINE + A as user = 'Alice' + ) T +EOF + +JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \ + --library $SQL_JARS_DIR \ + --jar $SQL_TOOLBOX_JAR \ + --environment $SQL_CONF \ + --update "$SQL_STATEMENT_5" | grep "Job ID:" | sed 's/.* //g') + +# 3 upsert results and 6 append results and 3 match_recognize results +verify_result_line_number 12 "$ELASTICSEARCH_INDEX" diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml index 68ec71857bb..b7b21f861f9 100644 --- a/flink-libraries/flink-sql-client/pom.xml +++ b/flink-libraries/flink-sql-client/pom.xml @@ -64,6 +64,12 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cep_2.11</artifactId> + <version>${project.version}</version> + </dependency> + <!-- logging utilities --> <dependency> <groupId>org.slf4j</groupId> @@ -156,6 +162,7 @@ under the License. <include>org/apache/calcite/**</include> <include>org/apache/flink/calcite/shaded/**</include> <include>org/apache/flink/table/**</include> + <include>org/apache/flink/cep/**</include> <include>org.codehaus.commons.compiler.properties</include> <include>org/codehaus/janino/**</include> <include>org/codehaus/commons/**</include> diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala index bbebc73b24d..5b0aa65362f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala @@ -42,11 +42,13 @@ class DataStreamMatchRule RelOptRule.convert(logicalMatch.getInput, FlinkConventions.DATASTREAM) try { - Class.forName("org.apache.flink.cep.pattern.Pattern") + Class + .forName("org.apache.flink.cep.pattern.Pattern", + false, + Thread.currentThread().getContextClassLoader) } catch { case ex: ClassNotFoundException => throw new TableException( - "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.", - ex) + "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.", ex) } new DataStreamMatch( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extend streaming SQL end-to-end test to test MATCH_RECOGNIZE > ------------------------------------------------------------ > > Key: FLINK-10623 > URL: https://issues.apache.org/jira/browse/FLINK-10623 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL, Tests > Affects Versions: 1.7.0 > Reporter: Till Rohrmann > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We should extend the existing {{test_streaming_sql.sh}} to test the newly > added {{MATCH_RECOGNIZE}} functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)