[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860506#comment-17860506 ]
xuyang edited comment on FLINK-20539 at 6/27/24 2:47 PM: --------------------------------------------------------- 1. It looks like the original fix didn't fix it cleanly on the table api. Let me try to explain why the latest query failed briefly: The type of the *cast* in the first *sqlQuery* is the wrong {*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary table through {*}createTemporaryView{*}, it is converted to flink's own type {*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to *PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is no longer consistent with the type of the original calcite tree. ({*}Row with {color:#ff0000}*FULLY_QUALIFIED in calcite*{color} -> RowType in flink -> Row with{*} {color:#ff0000}*PEEK_FIELDS_NO_EXPAND in calcite*{color}{*}{{*}}) _Too detailed to read:_ After executing sqlQuery, the *Row* type about *CAST* in the query statement has become the wrong FULLY_QUALIFIED. However, when executing {*}createTemporaryView{*}, we put the calcite tree into {*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the *ResolvedSchema* in Flink, and store it in the catalog manager as a temporary table (i.e. {*}t1{*}). When executing *sqlQuery* again, we need to convert the ResolvedSchema of the *t1* table into a type that can be recognized by calcite ({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type in the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug. 2. By the way, I tried the following query and found that there was no error, but there was a slight problem with the plan. (Although the same type of ITCase did not report an error) {code:java} @Test def test(): Unit = { util.addTable(s""" |create table t1( | a int, | b varchar |) with ( | 'connector' = 'datagen' |) """.stripMargin) util.verifyExecPlan( "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col FROM t1") } // actual wrong plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]) // expected correct plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]){code} Now I have determined the cause of the problem and how to fix it, and I am adding some cases and will create a pr later. Due to the inconsistency of row types in Calcite and Flink, I cannot enumerate all possible error cases in the future. If there are queries with the same error in the future, anyone can link to this jira and I will solve it then. was (Author: xuyangzhong): 1. It looks like the original fix didn't fix it cleanly on the table api. Let me try to explain why the latest query failed briefly: The type of the *cast* in the first *sqlQuery* is the wrong {*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary table through {*}createTemporaryView{*}, it is converted to flink's own type {*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to *PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is no longer consistent with the type of the original calcite tree. ({*}Row with {color:#FF0000}*FULLY_QUALIFIED in calcite*{color} -> RowType in flink -> Row with{*} {color:#FF0000}*PEEK_FIELDS_NO_EXPAND in calcite*{color}{*}{*}) _Too detailed to read:_ After executing sqlQuery, the *Row* type about *CAST* in the query statement has become the wrong FULLY_QUALIFIED. However, when executing {*}createTemporaryView{*}, we put the calcite tree into {*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the *ResolvedSchema* in Flink, and store it in the catalog manager as a temporary table (i.e. {*}t1{*}). When executing *sqlQuery* again, we need to convert the ResolvedSchema of the *t1* table into a type that can be recognized by calcite ({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type in the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug. 2. By the way, I tried the following query and found that there was no error, but there was a slight problem with the plan. (Although the same type of ITCase did not report an error) {code:java} @Test def test(): Unit = { util.addTable(s""" |create table t1( | a int, | b varchar, | c as row(a, b) |) with ( | 'connector' = 'datagen' |) """.stripMargin) util.verifyExecPlan( "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col FROM t1") } // actual wrong plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]) // expected correct plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]){code} Now I have determined the cause of the problem and how to fix it, and I am adding some cases and will create a pr later. Due to the inconsistency of row types in Calcite and Flink, I cannot enumerate all possible error cases in the future. If there are queries with the same error in the future, anyone can link to this jira and I will solve it then. > Type mismatch when using ROW in computed column > ----------------------------------------------- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API > Reporter: Timo Walther > Assignee: xuyang > Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + " order_number BIGINT,\n" > + " price INT,\n" > + " first_name STRING,\n" > + " last_name STRING,\n" > + " buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT > NULL > rel: > LogicalProject(order_number=[$0], price=[$1], first_name=[$2], > last_name=[$3], buyer_name=[ROW($2, $3)]) > LogicalTableScan(table=[[default_catalog, default_database, Orders]]) > at > org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)