Thanks Leonard and Danny, This makes a lot of sense. My hope here is to only use SQL without any specialized Java/Scala code, so it seems it may not be possible to use either of these methods yet.
I’ll open an issue for the LookupTableSource implementation, and look into the workaround you suggested in the short term. Thanks! Kelly From: Leonard Xu <xbjt...@gmail.com> Date: Monday, July 20, 2020 at 7:49 PM To: Danny Chan <yuzhao....@gmail.com> Cc: Kelly Smith <kell...@zillowgroup.com>, Flink ML <user@flink.apache.org> Subject: Re: Flink SQL - Join Lookup Table Hi, kelly Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem), dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1]. But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBC、HBase、Hive, you can created an issue to support `lookupTableSource` for filesystem connector. Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table. Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0> 在 2020年7月21日,10:07,Danny Chan <yuzhao....@gmail.com<mailto:yuzhao....@gmail.com>> 写道: Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax Join LookupTable FOR SYSTEM_TIME AS OF … See [1] for details. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0> Best, Danny Chan 在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com<mailto:kell...@zillowgroup.com>>,写道: Hi folks, I have a question Flink SQL. What I want to do is this: * Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table. For example, a simple lookup table: CREATE TABLE LookupTable ( `computeClass` STRING, `multiplier` FLOAT ) WITH ( 'connector' = 'filesystem', 'path' = 'fpu-multipliers.csv', 'format' = 'csv' ) And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above. SELECT `timestamp`, // ... ks.computeClass, lt.`multiplier` FROM KafkaStream ks JOIN LookupTable lt ON ks.computeClass = lt.computeClass Doing a simple join like that gives me this error: “org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.” Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory). Any ideas on how to accomplish what I’m trying to do? Thanks! Kelly