Hi, kelly Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem), dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1].
But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBC、HBase、Hive, you can created an issue to support `lookupTableSource` for filesystem connector. Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table. Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function> > 在 2020年7月21日,10:07,Danny Chan <yuzhao....@gmail.com> 写道: > > Seems you want a temporal table join instead of a two stream join, if that is > your request, you should use syntax > > Join LookupTable FOR SYSTEM_TIME AS OF … > > See [1] for details. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table> > > Best, > Danny Chan > 在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com>,写道: >> Hi folks, >> >> >> I have a question Flink SQL. What I want to do is this: >> >> >> Join a simple lookup table (a few rows) to a stream of data to enrich the >> stream by adding a column from the lookup table. >> >> >> For example, a simple lookup table: >> >> >> CREATE TABLE LookupTable ( >> `computeClass` STRING, >> `multiplier` FLOAT >> ) WITH ( >> 'connector' = 'filesystem', >> 'path' = 'fpu-multipliers.csv', >> 'format' = 'csv' >> ) >> >> >> >> And I’ve got a Kafka connector table with rowtime semantics that has a >> `computeClass` field. I simply want to join (in a streaming fashion) the >> `multiplier` field above. >> >> >> SELECT >> `timestamp`, >> // ... >> ks.computeClass, >> lt.`multiplier` >> FROM KafkaStream ks >> JOIN LookupTable lt ON ks.computeClass = lt.computeClass >> >> Doing a simple join like that gives me this error: >> >> >> “org.apache.flink.table.api.TableException: Rowtime attributes must not be >> in the input rows of a regular join. As a workaround you can cast the time >> attributes of input tables to TIMESTAMP before.” >> >> >> Which leads me to believe that I should use an Interval Join instead, but >> that doesn’t seem to be appropriate since my table is static and has no >> concept of time. Basically, I want to hold the entire lookup table in >> memory, and simply enrich the Kafka stream (which need not be held in >> memory). >> >> >> Any ideas on how to accomplish what I’m trying to do? >> >> >> Thanks! >> >> Kelly >>