Yes, the new TableSource API allows to emit retractions. However, it does not give you direct access to DataStream API.

FLIP-136 [1] might help you in the near future. We hope it can be part of 1.12.

Regards,
Timo

[1] https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E

On 01.09.20 22:55, Satyam Shekhar wrote:
Thanks for your replies Matthias and Timo.

Converting the Table to DataStream, assigning a new Watermark & Rowtime attribute, and converting back to Table makes sense. One challenge with that approach is that Table to DataStream conversion could emit retractable data stream, however, I think, that can now be handled with the new TableSource API (in 1.11) that allows TableSource to emit retractions.

I'll try this approach when I migrate to the new API and report back.

Regards,
Satyam

On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <t...@ververica.com <mailto:t...@ververica.com>> wrote:

    Hi Satyam,

    Matthias is right. A rowtime attribute cannot be modified and needs
    to be passed "as is" through the pipeline. The only exceptions are
    if a newer rowtime is offered such as `TUMBLE_ROWTIME` or
    `MATCH_ROWTIME`. In your case, you need to define utime as the time
    attribute. If this is not possible, you either express the
    computation in regular SQL (with non-streaming optimizations) or you
    go to DataStream API prepare the table (assign new watermark and
    StreamRecord timestamp there) and go back to Table API.

    I hope this helps.

    Regards,
    Timo

    On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl
    <matth...@ververica.com <mailto:matth...@ververica.com>> wrote:

        Hi Satyam,
        Thanks for your post. Unfortunately, it looks like you cannot
        change the rowtime column here. The rowtime is strongly coupled
        with the Watermarks feature. By changing the rowtime column we
        cannot ensure that the watermarks are still aligned as Fabian
        mentioned in [1].

        @Timo Walther <mailto:t...@ververica.com> : Could you verify my
        findings?

        Best,
        Matthias

        [1]
        
https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation

        On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar
        <satyamshek...@gmail.com <mailto:satyamshek...@gmail.com>> wrote:

            Hello,

            I use Flink for continuous evaluation of SQL queries on
            streaming data. One of the use cases requires us to run
            recursive SQL queries. I am unable to find a way to edit
            rowtime time attribute of the intermediate result table.

            For example, let's assume that there is a table T0 with schema -
            root
              |-- str1: STRING
              |-- int1: BIGINT
              |-- utime: TIMESTAMP(3)
              |-- itime: TIMESTAMP(3) *ROWTIME*

            Now, let's create a view V0 -
            var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime
            from T0");

            I wish to change the rowtime of V0 from itime to utime. I
            tried doing -

            V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

            but ran into the following exception -

            org.apache.flink.table.api.ValidationException: Window
            properties can only be used on windowed tables.
            at
            
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
            ~[na:na]
            at
            
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]
            at
            
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
            ~[flink-table-api-java-1.11.1.jar:1.11.1]

            Any guidance on how to address this?

            Regards,
            Satyam



--
        Matthias Pohl| Engineer


        Follow us @VervericaData Ververica <https://www.ververica.com/>

        --

        Join Flink Forward <https://flink-forward.org/>- The Apache
        FlinkConference

        Stream Processing | Event Driven | Real Time

        --

        Ververica GmbH| Invalidenstrasse 115, 10115 Berlin, Germany

        --

        Ververica GmbH
        Registered at Amtsgericht Charlottenburg: HRB 158244 B
        Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang,
        Karl Anton Wehner



--
    Timo Walther| Software Engineer

    <https://data-artisans.com/>


    <https://www.ververica.com/>


    Follow us @VervericaData

    --

    Join Flink Forward <https://flink-forward.org/>- The Apache
    FlinkConference

    Stream Processing | Event Driven | Real Time

    --

    Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

    --

    Ververica GmbH
    Registered at Amtsgericht Charlottenburg: HRB 158244 B
    Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
    Anton Wehner


Reply via email to