Thanks for your replies Matthias and Timo. Converting the Table to DataStream, assigning a new Watermark & Rowtime attribute, and converting back to Table makes sense. One challenge with that approach is that Table to DataStream conversion could emit retractable data stream, however, I think, that can now be handled with the new TableSource API (in 1.11) that allows TableSource to emit retractions.
I'll try this approach when I migrate to the new API and report back. Regards, Satyam On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <t...@ververica.com> wrote: > Hi Satyam, > > Matthias is right. A rowtime attribute cannot be modified and needs to be > passed "as is" through the pipeline. The only exceptions are if a newer > rowtime is offered such as `TUMBLE_ROWTIME` or `MATCH_ROWTIME`. In your > case, you need to define utime as the time attribute. If this is not > possible, you either express the computation in regular SQL (with > non-streaming optimizations) or you go to DataStream API prepare the table > (assign new watermark and StreamRecord timestamp there) and go back to > Table API. > > I hope this helps. > > Regards, > Timo > > On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl <matth...@ververica.com> > wrote: > >> Hi Satyam, >> Thanks for your post. Unfortunately, it looks like you cannot change the >> rowtime column here. The rowtime is strongly coupled with the Watermarks >> feature. By changing the rowtime column we cannot ensure that the >> watermarks are still aligned as Fabian mentioned in [1]. >> >> @Timo Walther <t...@ververica.com> : Could you verify my findings? >> >> Best, >> Matthias >> >> [1] >> https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation >> >> On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar <satyamshek...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I use Flink for continuous evaluation of SQL queries on streaming data. >>> One of the use cases requires us to run recursive SQL queries. I am unable >>> to find a way to edit rowtime time attribute of the intermediate result >>> table. >>> >>> For example, let's assume that there is a table T0 with schema - >>> root >>> |-- str1: STRING >>> |-- int1: BIGINT >>> |-- utime: TIMESTAMP(3) >>> |-- itime: TIMESTAMP(3) *ROWTIME* >>> >>> Now, let's create a view V0 - >>> var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0"); >>> >>> I wish to change the rowtime of V0 from itime to utime. I tried doing - >>> >>> V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime()); >>> >>> but ran into the following exception - >>> >>> org.apache.flink.table.api.ValidationException: Window properties can >>> only be used on windowed tables. >>> at >>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na] >>> at >>> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> at >>> org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459) >>> ~[flink-table-api-java-1.11.1.jar:1.11.1] >>> >>> Any guidance on how to address this? >>> >>> Regards, >>> Satyam >>> >> >> >> -- >> >> Matthias Pohl | Engineer >> >> Follow us @VervericaData Ververica <https://www.ververica.com/> >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton >> Wehner >> > > > -- > > Timo Walther | Software Engineer > > <https://data-artisans.com/> > > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton > Wehner >