Hello, I use Flink for continuous evaluation of SQL queries on streaming data. One of the use cases requires us to run recursive SQL queries. I am unable to find a way to edit rowtime time attribute of the intermediate result table.
For example, let's assume that there is a table T0 with schema - root |-- str1: STRING |-- int1: BIGINT |-- utime: TIMESTAMP(3) |-- itime: TIMESTAMP(3) *ROWTIME* Now, let's create a view V0 - var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0"); I wish to change the rowtime of V0 from itime to utime. I tried doing - V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime()); but ran into the following exception - org.apache.flink.table.api.ValidationException: Window properties can only be used on windowed tables. at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1] at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na] at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475) ~[flink-table-api-java-1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459) ~[flink-table-api-java-1.11.1.jar:1.11.1] Any guidance on how to address this? Regards, Satyam