Tally created FLINK-29275: ----------------------------- Summary: Temporal Table function: Cannot add expression of different type to set Key: FLINK-29275 URL: https://issues.apache.org/jira/browse/FLINK-29275 Project: Flink Issue Type: Bug Affects Versions: 1.15.2 Reporter: Tally
I am useing the temporal table funciton to join two stream like this, but got this error. Any ways to solve this? {code:java} Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set: set type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL proctime, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* proctime0) NOT NULL expression type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL proctime, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL proctime0) NOT NULL set is rel#61:LogicalCorrelate.NONE.any.None: 0.[NONE].[NONE](left=HepRelVertex#59,right=HepRelVertex#60,correlation=$cor0,joinType=inner,requiredColumns={4}) expression is LogicalJoin(condition=[__TEMPORAL_JOIN_CONDITION($4, $7, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($5))], joinType=[inner]) LogicalProject(order_id=[$0], price=[$1], currency=[$2], order_time=[$3], proctime=[PROCTIME()]) LogicalTableScan(table=[[default_catalog, default_database, orders]]) LogicalProject(currency=[$0], conversion_rate=[$1], update_time=[$2], proctime=[PROCTIME()]) LogicalTableScan(table=[[default_catalog, default_database, currency_rates]]) {code} Fact Table: {code:java} CREATE TABLE `orders` ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), proctime as PROCTIME() ) WITH ( 'properties.bootstrap.servers' = '127.0.0.1:9092', 'properties.group.id' = 'test', 'scan.topic-partition-discovery.interval' = '10000', 'connector' = 'kafka', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'topic' = 'test1' ) {code} Build Table: {code:java} CREATE TABLE `currency_rates` ( currency STRING, conversion_rate BIGINT, update_time TIMESTAMP(3), proctime as PROCTIME() ) WITH ( 'properties.bootstrap.servers' = '127.0.0.1:9092', 'properties.group.id' = 'test', 'scan.topic-partition-discovery.interval' = '10000', 'connector' = 'kafka', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'topic' = 'test3' ) {code} The way to generate table function: {code:java} TemporalTableFunction table_rate = tEnv.from("currency_rates") .createTemporalTableFunction("update_time", "currency"); tEnv.registerFunction("rates", table_rate); {code} Join logic: {code:java} SELECT order_id, price, s.currency, conversion_rate, order_time FROM orders AS o, LATERAL TABLE (rates(o.proctime)) AS s WHERE o.currency = s.currency {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)