JIRA: https://issues.apache.org/jira/browse/FLINK-18651
godfrey he <godfre...@gmail.com> 于2020年7月21日周二 上午9:46写道: > hi Kelly, > As the exception message mentioned: currently, we must cast the time > attribute to regular TIMESTAMP type, > then we can do regular join. Because time attribute will be out-of-order > after regular join, > and then we can't do window aggregate based on the time attribute. > > We can improve it that the planner implicitly casts the time attribute to > regular TIMESTAMP type, > and throws exception there is an operator (after join) depended on time > attribute, like window aggregate. > > I will create a JIRA to trace this. > > Best, > Godfrey > > Kelly Smith <kell...@zillowgroup.com> 于2020年7月21日周二 上午6:38写道: > >> Hi folks, >> >> >> >> I have a question Flink SQL. What I want to do is this: >> >> >> >> - Join a simple lookup table (a few rows) to a stream of data to >> enrich the stream by adding a column from the lookup table. >> >> >> >> >> >> For example, a simple lookup table: >> >> >> >> *CREATE TABLE *LookupTable ( >> *`computeClass` *STRING, >> *`multiplier` * >> *FLOAT *) *WITH *( >> *'connector' *= *'filesystem'*, >> *'path' *= *'fpu-multipliers.csv'*, >> *'format' *= >> *'csv' *) >> >> >> >> >> >> And I’ve got a Kafka connector table with rowtime semantics that has a >> `computeClass` field. I simply want to join (in a streaming fashion) the >> `multiplier` field above. >> >> >> >> >> *SELECT *`timestamp`, >> >> // ... >> ks.computeClass, >> lt.`multiplier` >> *FROM *KafkaStream ks >> >> JOIN LookupTable lt ON ks.computeClass = lt.computeClass >> >> >> >> Doing a simple join like that gives me this error: >> >> >> >> “org.apache.flink.table.api.TableException: Rowtime attributes must not >> be in the input rows of a regular join. As a workaround you can cast the >> time attributes of input tables to TIMESTAMP before.” >> >> >> >> Which leads me to believe that I should use an Interval Join instead, but >> that doesn’t seem to be appropriate since my table is static and has no >> concept of time. Basically, I want to hold the entire lookup table in >> memory, and simply enrich the Kafka stream (which need not be held in >> memory). >> >> >> >> Any ideas on how to accomplish what I’m trying to do? >> >> >> >> Thanks! >> >> Kelly >> >