Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com>,写道:
> Hi folks,
>
> I have a question Flink SQL. What I want to do is this:
>
>
> • Join a simple lookup table (a few rows) to a stream of data to enrich the 
> stream by adding a column from the lookup table.
>
>
>
> For example, a simple lookup table:
>
> CREATE TABLE LookupTable (
>     `computeClass`  STRING,
>     `multiplier`    FLOAT
> ) WITH (
>     'connector' = 'filesystem',
>     'path' = 'fpu-multipliers.csv',
>     'format' = 'csv'
> )
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a 
> `computeClass` field. I simply want to join (in a streaming fashion) the 
> `multiplier` field above.
>
> SELECT
>    `timestamp`,
>    // ...
>    ks.computeClass,
>    lt.`multiplier`
> FROM KafkaStream ks
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
> Doing a simple join like that gives me this error:
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
> the input rows of a regular join. As a workaround you can cast the time 
> attributes of input tables to TIMESTAMP before.”
>
> Which leads me to believe that I should use an Interval Join instead, but 
> that doesn’t seem to be appropriate since my table is static and has no 
> concept of time. Basically, I want to hold the entire lookup table in memory, 
> and simply enrich the Kafka stream (which need not be held in memory).
>
> Any ideas on how to accomplish what I’m trying to do?
>
> Thanks!
> Kelly

Reply via email to