godfreyhe commented on a change in pull request #17205: URL: https://github.com/apache/flink/pull/17205#discussion_r709827514
########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.scala ########## @@ -109,4 +216,136 @@ class MatchRecognizeTest extends TableTestBase { |""".stripMargin util.verifyRelPlan(sqlQuery) } + + // ---------------------------------------------------------------------------------------- + // Tests for Illegal use of Match_RowTime + // ---------------------------------------------------------------------------------------- + + @Test + def testMatchRowtimeWithoutArgumentOnRowtimeLTZ(): Unit = { + thrown.expectMessage( + "MATCH_ROWTIME(rowtimeField) should be used when input stream " + + "contains rowtime attribute with TIMESTAMP_LTZ type.\n" + + "Please pass rowtime attribute field as input argument of " + + "MATCH_ROWTIME(rowtimeField) function.") + thrown.expect(classOf[AssertionError]) + + val sqlQuery = + s""" + |SELECT + | symbol, + | SUM(price) as price, + | TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime, + | TUMBLE_START(matchRowtime, interval '3' second) as startTime + |FROM Ticker + |MATCH_RECOGNIZE ( + | PARTITION BY symbol + | ORDER BY ts_ltz + | MEASURES + | A.price as price, + | A.tax as tax, + | MATCH_ROWTIME() as matchRowtime + | ONE ROW PER MATCH + | PATTERN (A) + | DEFINE + | A AS A.price > 0 + |) AS T + |GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second) + |""".stripMargin + util.verifyRelPlan(sqlQuery) + } + + @Test + def testMatchRowtimeWithMultipleArgs(): Unit = { + thrown.expectMessage("Invalid number of arguments to function 'MATCH_ROWTIME'.") + thrown.expect(classOf[ValidationException]) + + val sqlQuery = + s""" + |SELECT + | symbol, + | SUM(price) as price, + | TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime, + | TUMBLE_START(matchRowtime, interval '3' second) as startTime + |FROM Ticker + |MATCH_RECOGNIZE ( + | PARTITION BY symbol + | ORDER BY ts_ltz + | MEASURES + | A.price as price, + | A.tax as tax, + | MATCH_ROWTIME(ts_ltz, price) as matchRowtime + | ONE ROW PER MATCH + | PATTERN (A) + | DEFINE + | A AS A.price > 0 + |) AS T + |GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second) + |""".stripMargin + util.verifyRelPlan(sqlQuery) + } + + @Test + def testMatchRowtimeWithNonRowTimeAttributeAsArgs(): Unit = { + thrown.expectMessage( + "The function MATCH_ROWTIME requires argument to be a row time attribute type, " + + "but is 'INTEGER'.") + thrown.expect(classOf[ValidationException]) + + val sqlQuery = + s""" + |SELECT + | symbol, + | SUM(price) as price, + | TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime, + | TUMBLE_START(matchRowtime, interval '3' second) as startTime + |FROM Ticker + |MATCH_RECOGNIZE ( + | PARTITION BY symbol + | ORDER BY ts_ltz + | MEASURES + | A.price as price, + | A.tax as tax, + | MATCH_ROWTIME(price) as matchRowtime + | ONE ROW PER MATCH + | PATTERN (A) + | DEFINE + | A AS A.price > 0 + |) AS T + |GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second) + |""".stripMargin + util.verifyRelPlan(sqlQuery) + } + + @Test + def testMatchRowtimeWithRexCallAsArg(): Unit = { + thrown.expectMessage( + "The function MATCH_ROWTIME requires a field reference as argument, " + + "but actual argument is 'PLUS'.") Review comment: we can improve the error message ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.scala ########## @@ -70,6 +112,71 @@ class MatchRecognizeTest extends TableTestBase { util.verifyRelPlan(sqlQuery) } + def testWindowTVFOnMatchRecognizeOnRowtimeLTZ(): Unit = { Review comment: add `@Test` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperandCountRange; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.type.SqlOperandCountRanges; + +import java.util.List; + +/** + * The function used to access a rowtime attribute with TIMESTAMP or TIMESTAMP_LTZ type from + * MATCH_RECOGNIZE clause. The function accepts zero or one operand which is a field reference with + * rowtime attribute. If there is no operand, the function will return rowtime attribute with + * TIMESTAMP type. Otherwise, the return type will be same with the operand type. + */ +public class MatchRowTimeFunction extends SqlFunction { + + public MatchRowTimeFunction() { + super( + "MATCH_ROWTIME", + SqlKind.OTHER_FUNCTION, + null, + null, + null, + SqlFunctionCategory.MATCH_RECOGNIZE); + } + + @Override + public String getAllowedSignatures(String opNameToUse) { + return "MATCH_ROWTIME([rowtime_field])"; + } + + @Override + public SqlOperandCountRange getOperandCountRange() { + return SqlOperandCountRanges.between(0, 1); + } + + public String getSignatureTemplate(final int operandsCount) { + switch (operandsCount) { + case 0: + return "{}"; + case 1: + return "{0})"; + default: + throw new AssertionError(); + } + } + + @Override + public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { + List<SqlNode> operands = callBinding.operands(); + int n = operands.size(); + assert n == 0 || n == 1; + if (n == 0) { + return true; + } else { + SqlNode operand = callBinding.operand(0); + if (operand.getKind() != SqlKind.IDENTIFIER) { + if (throwOnFailure) { + ValidationException exception = + new ValidationException( + String.format( + "The function %s requires a field reference as argument, but actual argument is '%s'.", + callBinding.getOperator().getName(), + operand.getKind())); + throw exception; + } else { + return false; + } + } + RelDataType operandType = callBinding.getOperandType(0); + if (FlinkTypeFactory.isRowtimeIndicatorType(operandType)) { + return true; + } else { + if (throwOnFailure) { + ValidationException exception = Review comment: ditto ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperandCountRange; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.type.SqlOperandCountRanges; + +import java.util.List; + +/** + * The function used to access a rowtime attribute with TIMESTAMP or TIMESTAMP_LTZ type from + * MATCH_RECOGNIZE clause. The function accepts zero or one operand which is a field reference with + * rowtime attribute. If there is no operand, the function will return rowtime attribute with + * TIMESTAMP type. Otherwise, the return type will be same with the operand type. + */ +public class MatchRowTimeFunction extends SqlFunction { + + public MatchRowTimeFunction() { + super( + "MATCH_ROWTIME", + SqlKind.OTHER_FUNCTION, + null, + null, + null, + SqlFunctionCategory.MATCH_RECOGNIZE); + } + + @Override + public String getAllowedSignatures(String opNameToUse) { + return "MATCH_ROWTIME([rowtime_field])"; + } + + @Override + public SqlOperandCountRange getOperandCountRange() { + return SqlOperandCountRanges.between(0, 1); + } + + public String getSignatureTemplate(final int operandsCount) { + switch (operandsCount) { + case 0: + return "{}"; + case 1: + return "{0})"; + default: + throw new AssertionError(); + } + } + + @Override + public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { + List<SqlNode> operands = callBinding.operands(); + int n = operands.size(); + assert n == 0 || n == 1; + if (n == 0) { + return true; + } else { + SqlNode operand = callBinding.operand(0); + if (operand.getKind() != SqlKind.IDENTIFIER) { + if (throwOnFailure) { + ValidationException exception = Review comment: inline variable ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java ########## @@ -121,18 +107,9 @@ public void lookupOperatorOverloads( /** * Function used to access a event time attribute with TIMESTAMP or TIMESTAMP_LTZ type from Review comment: typo: a -> an -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org