[ https://issues.apache.org/jira/browse/FLINK-29275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tally closed FLINK-29275. ------------------------- Resolution: Later > Temporal Table function: Cannot add expression of different type to set > ----------------------------------------------------------------------- > > Key: FLINK-29275 > URL: https://issues.apache.org/jira/browse/FLINK-29275 > Project: Flink > Issue Type: Bug > Affects Versions: 1.15.2 > Reporter: Tally > Priority: Major > > I am useing the temporal table funciton to join two stream like this, but got > this error. Any ways to solve this? > {code:java} > Exception in thread "main" java.lang.AssertionError: Cannot add expression of > different type to set: > set type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, > DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, > TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL proctime, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT > conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* > proctime0) NOT NULL > expression type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > order_id, DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > currency, TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL > proctime, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT > conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT > NULL proctime0) NOT NULL > set is rel#61:LogicalCorrelate.NONE.any.None: > 0.[NONE].[NONE](left=HepRelVertex#59,right=HepRelVertex#60,correlation=$cor0,joinType=inner,requiredColumns={4}) > expression is LogicalJoin(condition=[__TEMPORAL_JOIN_CONDITION($4, $7, > __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($5))], joinType=[inner]) > LogicalProject(order_id=[$0], price=[$1], currency=[$2], order_time=[$3], > proctime=[PROCTIME()]) > LogicalTableScan(table=[[default_catalog, default_database, orders]]) > LogicalProject(currency=[$0], conversion_rate=[$1], update_time=[$2], > proctime=[PROCTIME()]) > LogicalTableScan(table=[[default_catalog, default_database, > currency_rates]]) > {code} > Fact Table: > {code:java} > CREATE TABLE `orders` ( > order_id STRING, > price DECIMAL(32,2), > currency STRING, > order_time TIMESTAMP(3), > proctime as PROCTIME() > ) WITH ( > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'properties.group.id' = 'test', > 'scan.topic-partition-discovery.interval' = '10000', > 'connector' = 'kafka', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset', > 'topic' = 'test1' > ) {code} > Build Table: > {code:java} > CREATE TABLE `currency_rates` ( > currency STRING, > conversion_rate BIGINT, > update_time TIMESTAMP(3), > proctime as PROCTIME() > ) WITH ( > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'properties.group.id' = 'test', > 'scan.topic-partition-discovery.interval' = '10000', > 'connector' = 'kafka', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset', > 'topic' = 'test3' > ) {code} > The way to generate table function: > {code:java} > TemporalTableFunction table_rate = tEnv.from("currency_rates") > .createTemporalTableFunction("update_time", "currency"); > tEnv.registerFunction("rates", table_rate); {code} > Join logic: > {code:java} > SELECT > order_id, > price, > s.currency, > conversion_rate, > order_time > FROM orders AS o, > LATERAL TABLE (rates(o.proctime)) AS s > WHERE o.currency = s.currency {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)