[ 
https://issues.apache.org/jira/browse/FLINK-29275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tally closed FLINK-29275.
-------------------------
    Resolution: Later

> Temporal Table function: Cannot add expression of different type to set
> -----------------------------------------------------------------------
>
>                 Key: FLINK-29275
>                 URL: https://issues.apache.org/jira/browse/FLINK-29275
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.2
>            Reporter: Tally
>            Priority: Major
>
> I am useing the temporal table funciton to join two stream like this, but got 
> this error. Any ways to solve this?
> {code:java}
> Exception in thread "main" java.lang.AssertionError: Cannot add expression of 
> different type to set:
> set type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, 
> DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL proctime, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT 
> conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* 
> proctime0) NOT NULL
> expression type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> order_id, DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> currency, TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL 
> proctime, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT 
> conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT 
> NULL proctime0) NOT NULL
> set is rel#61:LogicalCorrelate.NONE.any.None: 
> 0.[NONE].[NONE](left=HepRelVertex#59,right=HepRelVertex#60,correlation=$cor0,joinType=inner,requiredColumns={4})
> expression is LogicalJoin(condition=[__TEMPORAL_JOIN_CONDITION($4, $7, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($5))], joinType=[inner])
>   LogicalProject(order_id=[$0], price=[$1], currency=[$2], order_time=[$3], 
> proctime=[PROCTIME()])
>     LogicalTableScan(table=[[default_catalog, default_database, orders]])
>   LogicalProject(currency=[$0], conversion_rate=[$1], update_time=[$2], 
> proctime=[PROCTIME()])
>     LogicalTableScan(table=[[default_catalog, default_database, 
> currency_rates]])
>  {code}
> Fact Table:
> {code:java}
> CREATE TABLE `orders` (
>     order_id    STRING,
>     price       DECIMAL(32,2),
>     currency    STRING,
>     order_time  TIMESTAMP(3),
>     proctime as PROCTIME()
>  ) WITH (
>     'properties.bootstrap.servers' = '127.0.0.1:9092',
>     'properties.group.id' = 'test',
>     'scan.topic-partition-discovery.interval' = '10000',
>     'connector' = 'kafka',
>     'format' = 'json',
>     'scan.startup.mode' = 'latest-offset', 
>     'topic' = 'test1'
>   ) {code}
> Build Table:
> {code:java}
> CREATE TABLE `currency_rates` (
>     currency    STRING,
>     conversion_rate BIGINT,
>     update_time  TIMESTAMP(3),
>     proctime as PROCTIME()
>  ) WITH (
>     'properties.bootstrap.servers' = '127.0.0.1:9092',
>     'properties.group.id' = 'test',
>     'scan.topic-partition-discovery.interval' = '10000',
>     'connector' = 'kafka',
>     'format' = 'json',
>     'scan.startup.mode' = 'latest-offset', 
>     'topic' = 'test3'
>   ) {code}
> The way to generate table function:
> {code:java}
> TemporalTableFunction table_rate = tEnv.from("currency_rates")
> .createTemporalTableFunction("update_time", "currency");
> tEnv.registerFunction("rates", table_rate); {code}
> Join logic:
> {code:java}
>  SELECT
>     order_id,
>     price,
>     s.currency,
>     conversion_rate,
>     order_time
>  FROM orders AS o,  
>  LATERAL TABLE (rates(o.proctime)) AS s
>  WHERE o.currency = s.currency {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to