Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax
Join LookupTable FOR SYSTEM_TIME AS OF … See [1] for details. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table Best, Danny Chan 在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com>,写道: > Hi folks, > > I have a question Flink SQL. What I want to do is this: > > > • Join a simple lookup table (a few rows) to a stream of data to enrich the > stream by adding a column from the lookup table. > > > > For example, a simple lookup table: > > CREATE TABLE LookupTable ( > `computeClass` STRING, > `multiplier` FLOAT > ) WITH ( > 'connector' = 'filesystem', > 'path' = 'fpu-multipliers.csv', > 'format' = 'csv' > ) > > > And I’ve got a Kafka connector table with rowtime semantics that has a > `computeClass` field. I simply want to join (in a streaming fashion) the > `multiplier` field above. > > SELECT > `timestamp`, > // ... > ks.computeClass, > lt.`multiplier` > FROM KafkaStream ks > JOIN LookupTable lt ON ks.computeClass = lt.computeClass > Doing a simple join like that gives me this error: > > “org.apache.flink.table.api.TableException: Rowtime attributes must not be in > the input rows of a regular join. As a workaround you can cast the time > attributes of input tables to TIMESTAMP before.” > > Which leads me to believe that I should use an Interval Join instead, but > that doesn’t seem to be appropriate since my table is static and has no > concept of time. Basically, I want to hold the entire lookup table in memory, > and simply enrich the Kafka stream (which need not be held in memory). > > Any ideas on how to accomplish what I’m trying to do? > > Thanks! > Kelly