Hi Gyula,

first of all the exception

```
org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
```

is IMHO one of the biggest shortcomings that we currently have in the planners due to internals around time interval joins [0]. But this is a different topic.

I think in theory Gyula is right, however, we would need to store the static table somewhere in order to perform lookups while the stream is passing by. And while checking the time attributes we would need to know which table is bounded and what kind of changes are coming into the streaming table.

There is still a lot of work in the future to make the concepts smoother.

Regards,
Timo


[0] https://issues.apache.org/jira/browse/FLINK-10211





On 20.04.20 16:09, Gyula Fóra wrote:
The HiveTableSource (and many others) return isBounded() -> true.
In this case it is not even possible for it to change over time, so I am a bit confused.

To me it sounds like you should always be able to join a stream against a bounded table, temporal or not it is pretty well defined. Maybe there is some fundamental concept that I dont understand, I don't have much experience with this to be fair.

Gyula

On Mon, Apr 20, 2020 at 4:03 PM Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> wrote:

    The reason here is Flink doesn't know the hive table is static.
    After you create these two tables and
    trying to join them, Flink will assume both table will be changing
    with time.

    Best,
    Kurt


    On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra <gyula.f...@gmail.com
    <mailto:gyula.f...@gmail.com>> wrote:

        Hi!

        The problem here is that I dont have a temporal table.

        I have a regular stream from kafka (with even time attribute)
        and a static table in hive.
        The Hive table is static, it doesn't change. It doesn't have any
        time attribute, it's not temporal.

        Gyula

        On Mon, Apr 20, 2020 at 3:43 PM godfrey he <godfre...@gmail.com
        <mailto:godfre...@gmail.com>> wrote:

            Hi Gyual,

            Can you convert the regular join to lookup join (temporal
            join) [1],
            and then you can use window aggregate.

             >  I understand that the problem is that we cannot join
            with the Hive table and still maintain the watermark/even
            time column. But why is this?
            Regular join can't maintain the time attribute as increasing
            trend (one record may be joined with a very old record),
            that means the watermark does not also been guaranteed to
            increase.

            
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

            Best,
            Godfrey

            Gyula Fóra <gyula.f...@gmail.com
            <mailto:gyula.f...@gmail.com>> 于2020年4月20日周一 下午4:46
            写道:

                Hi All!

                We hit a the following problem with SQL and trying to
                understand if there is a valid workaround.

                We have 2 tables:

                _Kafka_
                timestamp (ROWTIME)
                item
                quantity

                _Hive_
                item
                price

                So we basically have incoming (ts, id, quantity) and we
                want to join it with the hive table to get the total
                price (price * quantity) got the current item.

                After this we want to create window aggregate on
                quantity*price windowed on timestamp (event time attribute).

                In any way we formulate this query we hit the following
                error:
                org.apache.flink.table.api.TableException: Rowtime
                attributes must not be in the input rows of a regular
                join. As a workaround you can cast the time attributes
                of input tables to TIMESTAMP before.

                  I understand that the problem is that we cannot join
                with the Hive table and still maintain the
                watermark/even time column. But why is this?

                In datastream world I would just simply assign Max
                watermark to my enrichment input and join outputs will
                get the ts of the input record. Can I achieve something
                similar in SQL/Table api?

                Thank you!
                Gyula


Reply via email to