leonardBang commented on a change in pull request #17205:
URL: https://github.com/apache/flink/pull/17205#discussion_r707457704



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+
+import java.util.List;
+
+/**
+ * Function used to access an row time attribute with TIMESTAMP or 
TIMESTAMP_LTZ type from
+ * MATCH_RECOGNIZE. The function could receive no operand or one operand which 
is a field reference
+ * with row time attribute. If there is no operand, the function returns row 
time attribute with
+ * TIMESTAMP. Else, return type is same with operand type.

Review comment:
       ```suggestion
    * The function used to access a rowtime attribute with TIMESTAMP or 
TIMESTAMP_LTZ type from
    * MATCH_RECOGNIZE clause. The function accepts zero or one operand which is 
a field reference
    * with rowtime attribute. If there is no operand, the function will return 
rowtime attribute with
    * TIMESTAMP type. Otherwise, the return type will be same with the operand 
type.
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+
+import java.util.List;
+
+/**
+ * Function used to access an row time attribute with TIMESTAMP or 
TIMESTAMP_LTZ type from
+ * MATCH_RECOGNIZE. The function could receive no operand or one operand which 
is a field reference
+ * with row time attribute. If there is no operand, the function returns row 
time attribute with
+ * TIMESTAMP. Else, return type is same with operand type.
+ */
+public class MatchRowTimeFunction extends SqlFunction {
+
+    /** Creates a window table function with a given name. */
+    public MatchRowTimeFunction() {
+        super(
+                "MATCH_ROWTIME",
+                SqlKind.OTHER_FUNCTION,
+                null,
+                null,
+                null,
+                SqlFunctionCategory.MATCH_RECOGNIZE);
+    }
+
+    @Override
+    public String getAllowedSignatures(String opNameToUse) {
+        return "MATCH_ROWTIME([time attribute field])";
+    }
+
+    @Override
+    public SqlOperandCountRange getOperandCountRange() {
+        return SqlOperandCountRanges.between(0, 1);
+    }
+
+    public String getSignatureTemplate(final int operandsCount) {
+        switch (operandsCount) {
+            case 0:
+                return "{}";
+            case 1:
+                return "{0})";
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    @Override
+    public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+        List<SqlNode> operands = callBinding.operands();
+        int n = operands.size();
+        assert n == 0 || n == 1;
+        if (n == 0) {
+            return true;
+        } else {
+            SqlNode operand = callBinding.operand(0);
+            if (operand.getKind() != SqlKind.IDENTIFIER) {
+                if (throwOnFailure) {
+                    ValidationException exception =
+                            new ValidationException(
+                                    String.format(
+                                            "The function %s requires argument 
to be a field reference, but is '%s'.",

Review comment:
       ```suggestion
                                               "The function %s requires a 
field reference as argument, but actual argument is '%s'.",
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
##########
@@ -65,6 +70,28 @@ class FlinkLogicalMatch(
     interval)
   with FlinkLogicalRel {
 
+  override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
+    val inputContainsRowTimeLtz = input.getRowType.getFieldList.exists { field 
=>
+      isRowtimeIndicatorType(field.getType) &&
+        field.getType.getSqlTypeName == 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+    }
+    if (inputContainsRowTimeLtz) {
+      val containMatchRowTimeWithoutArgs = getMeasures.values().exists(
+        MatchUtil.isFinalOnMatchRowTimeWithoutArgs)
+      if (containMatchRowTimeWithoutArgs) {
+        litmus.fail(
+          "MATCH_ROWTIME() could only be used when input stream does not 
contain " +
+            "row time attribute with TIMESTAMP_LTZ type.\n" +
+            "Please pass rowtime attribute field as input argument of 
MATCH_ROWTIME function.")

Review comment:
       ```suggestion
             "MATCH_ROWTIME(rowtimeField) should be used when input stream 
contains " +
               "rowtime attribute with TIMESTAMP_LTZ type.\n" +
               "Please pass rowtime attribute field as input argument of 
MATCH_ROWTIME(rowtimeField) function.")
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+
+import java.util.List;
+
+/**
+ * Function used to access an row time attribute with TIMESTAMP or 
TIMESTAMP_LTZ type from
+ * MATCH_RECOGNIZE. The function could receive no operand or one operand which 
is a field reference
+ * with row time attribute. If there is no operand, the function returns row 
time attribute with
+ * TIMESTAMP. Else, return type is same with operand type.
+ */
+public class MatchRowTimeFunction extends SqlFunction {
+
+    /** Creates a window table function with a given name. */
+    public MatchRowTimeFunction() {
+        super(
+                "MATCH_ROWTIME",
+                SqlKind.OTHER_FUNCTION,
+                null,
+                null,
+                null,
+                SqlFunctionCategory.MATCH_RECOGNIZE);
+    }
+
+    @Override
+    public String getAllowedSignatures(String opNameToUse) {
+        return "MATCH_ROWTIME([time attribute field])";
+    }
+
+    @Override
+    public SqlOperandCountRange getOperandCountRange() {
+        return SqlOperandCountRanges.between(0, 1);
+    }
+
+    public String getSignatureTemplate(final int operandsCount) {
+        switch (operandsCount) {
+            case 0:
+                return "{}";
+            case 1:
+                return "{0})";
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    @Override
+    public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+        List<SqlNode> operands = callBinding.operands();
+        int n = operands.size();
+        assert n == 0 || n == 1;
+        if (n == 0) {
+            return true;
+        } else {
+            SqlNode operand = callBinding.operand(0);
+            if (operand.getKind() != SqlKind.IDENTIFIER) {
+                if (throwOnFailure) {
+                    ValidationException exception =
+                            new ValidationException(
+                                    String.format(
+                                            "The function %s requires argument 
to be a field reference, but is '%s'.",
+                                            
callBinding.getOperator().getName(),
+                                            operand.getKind()));
+                    throw exception;
+                } else {
+                    return false;
+                }
+            }
+            RelDataType operandType = callBinding.getOperandType(0);
+            if (FlinkTypeFactory.isRowtimeIndicatorType(operandType)) {
+                return true;
+            } else {
+                if (throwOnFailure) {
+                    ValidationException exception =
+                            new ValidationException(
+                                    String.format(
+                                            "The function %s requires argument 
to be a row time attribute type, but is '%s'.",
+                                            
callBinding.getOperator().getName(), operandType));
+                    throw exception;
+                } else {
+                    return false;
+                }
+            }
+        }
+    }
+
+    @Override
+    public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+        // Returns RowTime if there is no argument

Review comment:
       rowtime

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.scala
##########
@@ -43,6 +44,47 @@ class MatchRecognizeTest extends TableTestBase {
     util.tableEnv.executeSql(ddl)
   }
 
+  @Test
+  def testMatchRecognizeOnRowtime(): Unit = {
+    val ddl =
+      """
+        |CREATE TABLE Ticker1 (
+        | `symbol` STRING,
+        | `ts_ltz` TIMESTAMP(3),

Review comment:
       hint: ts

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
##########
@@ -65,6 +70,28 @@ class FlinkLogicalMatch(
     interval)
   with FlinkLogicalRel {
 
+  override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
+    val inputContainsRowTimeLtz = input.getRowType.getFieldList.exists { field 
=>
+      isRowtimeIndicatorType(field.getType) &&
+        field.getType.getSqlTypeName == 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+    }

Review comment:
       hint : ` isRowtimeIndicatorType(field.getType) && 
isTimestampLtzIndicatorType(field.getType)`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+
+import java.util.List;
+
+/**
+ * Function used to access an row time attribute with TIMESTAMP or 
TIMESTAMP_LTZ type from
+ * MATCH_RECOGNIZE. The function could receive no operand or one operand which 
is a field reference
+ * with row time attribute. If there is no operand, the function returns row 
time attribute with
+ * TIMESTAMP. Else, return type is same with operand type.
+ */
+public class MatchRowTimeFunction extends SqlFunction {
+
+    /** Creates a window table function with a given name. */
+    public MatchRowTimeFunction() {
+        super(
+                "MATCH_ROWTIME",
+                SqlKind.OTHER_FUNCTION,
+                null,
+                null,
+                null,

Review comment:
       please given proper `TypeInference`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/MatchRowTimeFunction.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+
+import java.util.List;
+
+/**
+ * Function used to access an row time attribute with TIMESTAMP or 
TIMESTAMP_LTZ type from
+ * MATCH_RECOGNIZE. The function could receive no operand or one operand which 
is a field reference
+ * with row time attribute. If there is no operand, the function returns row 
time attribute with
+ * TIMESTAMP. Else, return type is same with operand type.
+ */
+public class MatchRowTimeFunction extends SqlFunction {
+
+    /** Creates a window table function with a given name. */

Review comment:
       window table function ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to