JIRA: https://issues.apache.org/jira/browse/FLINK-18651

godfrey he <godfre...@gmail.com> 于2020年7月21日周二 上午9:46写道:

> hi  Kelly,
> As the exception message mentioned: currently, we must cast the time
> attribute to regular TIMESTAMP type,
> then we can do regular join. Because time attribute will be out-of-order
> after regular join,
> and then we can't do window aggregate based on the time attribute.
>
> We can improve it that the planner implicitly casts the time attribute to
> regular TIMESTAMP type,
> and throws exception there is an operator (after join) depended on time
> attribute, like window aggregate.
>
> I will create a JIRA to trace this.
>
> Best,
> Godfrey
>
> Kelly Smith <kell...@zillowgroup.com> 于2020年7月21日周二 上午6:38写道:
>
>> Hi folks,
>>
>>
>>
>> I have a question Flink SQL. What I want to do is this:
>>
>>
>>
>>    - Join a simple lookup table (a few rows) to a stream of data to
>>    enrich the stream by adding a column from the lookup table.
>>
>>
>>
>>
>>
>> For example, a simple lookup table:
>>
>>
>>
>> *CREATE TABLE *LookupTable (
>>     *`computeClass`  *STRING,
>>     *`multiplier`    *
>> *FLOAT *) *WITH *(
>>     *'connector' *= *'filesystem'*,
>>     *'path' *= *'fpu-multipliers.csv'*,
>>     *'format' *=
>> *'csv' *)
>>
>>
>>
>>
>>
>> And I’ve got a Kafka connector table with rowtime semantics that has a
>> `computeClass` field. I simply want to join (in a streaming fashion) the
>> `multiplier` field above.
>>
>>
>>
>>
>> *SELECT    *`timestamp`,
>>
>>     // ...
>>     ks.computeClass,
>>     lt.`multiplier`
>> *FROM *KafkaStream ks
>>
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>
>>
>>
>> Doing a simple join like that gives me this error:
>>
>>
>>
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not
>> be in the input rows of a regular join. As a workaround you can cast the
>> time attributes of input tables to TIMESTAMP before.”
>>
>>
>>
>> Which leads me to believe that I should use an Interval Join instead, but
>> that doesn’t seem to be appropriate since my table is static and has no
>> concept of time. Basically, I want to hold the entire lookup table in
>> memory, and simply enrich the Kafka stream (which need not be held in
>> memory).
>>
>>
>>
>> Any ideas on how to accomplish what I’m trying to do?
>>
>>
>>
>> Thanks!
>>
>> Kelly
>>
>

Reply via email to