Yes, the new TableSource API allows to emit retractions. However, it
does not give you direct access to DataStream API.
FLIP-136 [1] might help you in the near future. We hope it can be part
of 1.12.
Regards,
Timo
[1]
https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E
On 01.09.20 22:55, Satyam Shekhar wrote:
Thanks for your replies Matthias and Timo.
Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that approach is that Table to DataStream conversion could emit
retractable data stream, however, I think, that can now be handled with
the new TableSource API (in 1.11) that allows TableSource to emit
retractions.
I'll try this approach when I migrate to the new API and report back.
Regards,
Satyam
On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <t...@ververica.com
<mailto:t...@ververica.com>> wrote:
Hi Satyam,
Matthias is right. A rowtime attribute cannot be modified and needs
to be passed "as is" through the pipeline. The only exceptions are
if a newer rowtime is offered such as `TUMBLE_ROWTIME` or
`MATCH_ROWTIME`. In your case, you need to define utime as the time
attribute. If this is not possible, you either express the
computation in regular SQL (with non-streaming optimizations) or you
go to DataStream API prepare the table (assign new watermark and
StreamRecord timestamp there) and go back to Table API.
I hope this helps.
Regards,
Timo
On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl
<matth...@ververica.com <mailto:matth...@ververica.com>> wrote:
Hi Satyam,
Thanks for your post. Unfortunately, it looks like you cannot
change the rowtime column here. The rowtime is strongly coupled
with the Watermarks feature. By changing the rowtime column we
cannot ensure that the watermarks are still aligned as Fabian
mentioned in [1].
@Timo Walther <mailto:t...@ververica.com> : Could you verify my
findings?
Best,
Matthias
[1]
https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar
<satyamshek...@gmail.com <mailto:satyamshek...@gmail.com>> wrote:
Hello,
I use Flink for continuous evaluation of SQL queries on
streaming data. One of the use cases requires us to run
recursive SQL queries. I am unable to find a way to edit
rowtime time attribute of the intermediate result table.
For example, let's assume that there is a table T0 with schema -
root
|-- str1: STRING
|-- int1: BIGINT
|-- utime: TIMESTAMP(3)
|-- itime: TIMESTAMP(3) *ROWTIME*
Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime
from T0");
I wish to change the rowtime of V0 from itime to utime. I
tried doing -
V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
but ran into the following exception -
org.apache.flink.table.api.ValidationException: Window
properties can only be used on windowed tables.
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
~[na:na]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
~[flink-table-api-java-1.11.1.jar:1.11.1]
Any guidance on how to address this?
Regards,
Satyam
--
Matthias Pohl| Engineer
Follow us @VervericaData Ververica <https://www.ververica.com/>
--
Join Flink Forward <https://flink-forward.org/>- The Apache
FlinkConference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH| Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang,
Karl Anton Wehner
--
Timo Walther| Software Engineer
<https://data-artisans.com/>
<https://www.ververica.com/>
Follow us @VervericaData
--
Join Flink Forward <https://flink-forward.org/>- The Apache
FlinkConference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
Anton Wehner