Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension 
table(From filesystem),  dimension table is one kind of Temporal Table, 
temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet 
which means you can not use it as a dimension table, the connector that 
supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem 
connector.

Another approach is using Temporal Table Function[1] which can define a 
Temporal table from a dataStream, you can convert your Table(filesystem table) 
to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function>


> 在 2020年7月21日,10:07,Danny Chan <yuzhao....@gmail.com> 写道:
> 
> Seems you want a temporal table join instead of a two stream join, if that is 
> your request, you should use syntax
> 
> Join LookupTable FOR SYSTEM_TIME AS OF …
> 
> See [1] for details.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table>
> 
> Best,
> Danny Chan
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith <kell...@zillowgroup.com>,写道:
>> Hi folks,
>> 
>>  
>> I have a question Flink SQL. What I want to do is this:
>> 
>>  
>> Join a simple lookup table (a few rows) to a stream of data to enrich the 
>> stream by adding a column from the lookup table.
>>  
>>  
>> For example, a simple lookup table:
>> 
>>  
>> CREATE TABLE LookupTable (
>>     `computeClass`  STRING,
>>     `multiplier`    FLOAT
>> ) WITH (
>>     'connector' = 'filesystem',
>>     'path' = 'fpu-multipliers.csv',
>>     'format' = 'csv'
>> )
>> 
>>  
>>  
>> And I’ve got a Kafka connector table with rowtime semantics that has a 
>> `computeClass` field. I simply want to join (in a streaming fashion) the 
>> `multiplier` field above.
>> 
>>  
>> SELECT
>>     `timestamp`,
>>     // ...
>>     ks.computeClass,
>>     lt.`multiplier`
>> FROM KafkaStream ks
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>  
>> Doing a simple join like that gives me this error:
>> 
>>  
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not be 
>> in the input rows of a regular join. As a workaround you can cast the time 
>> attributes of input tables to TIMESTAMP before.”
>> 
>>  
>> Which leads me to believe that I should use an Interval Join instead, but 
>> that doesn’t seem to be appropriate since my table is static and has no 
>> concept of time. Basically, I want to hold the entire lookup table in 
>> memory, and simply enrich the Kafka stream (which need not be held in 
>> memory).
>> 
>>  
>> Any ideas on how to accomplish what I’m trying to do?
>> 
>>  
>> Thanks!
>> 
>> Kelly
>> 

Reply via email to