[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860506#comment-17860506
 ] 

xuyang edited comment on FLINK-20539 at 6/27/24 2:47 PM:
---------------------------------------------------------

1. It looks like the original fix didn't fix it cleanly on the table api. Let 
me try to explain why the latest query failed briefly:
The type of the *cast* in the first *sqlQuery* is the wrong 
{*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary 
table through {*}createTemporaryView{*}, it is converted to flink's own type 
{*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to 
*PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is 
no longer consistent with the type of the original calcite tree.

({*}Row with {color:#ff0000}*FULLY_QUALIFIED in calcite*{color} -> RowType in 
flink -> Row with{*} {color:#ff0000}*PEEK_FIELDS_NO_EXPAND in 
calcite*{color}{*}{{*}})

 

_Too detailed to read:_

After executing sqlQuery, the *Row* type about *CAST* in the query statement 
has become the wrong FULLY_QUALIFIED. However, when executing 
{*}createTemporaryView{*}, we put the calcite tree into 
{*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into 
the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the 
*ResolvedSchema* in Flink, and store it in the catalog manager as a temporary 
table (i.e. {*}t1{*}).

When executing *sqlQuery* again, we need to convert the ResolvedSchema of the 
*t1* table into a type that can be recognized by calcite 
({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast 
type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type  in 
the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after 
flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug.

 

2. By the way, I tried the following query and found that there was no error, 
but there was a slight problem with the plan. (Although the same type of ITCase 
did not report an error)

 
{code:java}
@Test
  def test(): Unit = {
    util.addTable(s"""
                     |create table t1(
                     |  a int,
                     |  b varchar
                     |) with (
                     |  'connector' = 'datagen'
                     |)
       """.stripMargin)    util.verifyExecPlan(
      "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col 
FROM t1")
  } 

// actual wrong plan
Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, 
VARCHAR(2147483647) b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b])

// expected correct plan
Calc(select=[a, b, CAST(ROW(a, b) AS 
RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) 
b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b]){code}
 

 

Now I have determined the cause of the problem and how to fix it, and I am 
adding some cases and will create a pr later. Due to the inconsistency of row 
types in Calcite and Flink, I cannot enumerate all possible error cases in the 
future. If there are queries with the same error in the future, anyone can link 
to this jira and I will solve it then.


was (Author: xuyangzhong):
1. It looks like the original fix didn't fix it cleanly on the table api. Let 
me try to explain why the latest query failed briefly:
The type of the *cast* in the first *sqlQuery* is the wrong 
{*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary 
table through {*}createTemporaryView{*}, it is converted to flink's own type 
{*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to 
*PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is 
no longer consistent with the type of the original calcite tree.

({*}Row with {color:#FF0000}*FULLY_QUALIFIED in calcite*{color} -> RowType in 
flink -> Row with{*} {color:#FF0000}*PEEK_FIELDS_NO_EXPAND in 
calcite*{color}{*}{*})

 

_Too detailed to read:_

After executing sqlQuery, the *Row* type about *CAST* in the query statement 
has become the wrong FULLY_QUALIFIED. However, when executing 
{*}createTemporaryView{*}, we put the calcite tree into 
{*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into 
the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the 
*ResolvedSchema* in Flink, and store it in the catalog manager as a temporary 
table (i.e. {*}t1{*}).

When executing *sqlQuery* again, we need to convert the ResolvedSchema of the 
*t1* table into a type that can be recognized by calcite 
({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast 
type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type  in 
the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after 
flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug.

 

2. By the way, I tried the following query and found that there was no error, 
but there was a slight problem with the plan. (Although the same type of ITCase 
did not report an error)

 
{code:java}
@Test
  def test(): Unit = {
    util.addTable(s"""
                     |create table t1(
                     |  a int,
                     |  b varchar,
                     |  c as row(a, b)
                     |) with (
                     |  'connector' = 'datagen'
                     |)
       """.stripMargin)    util.verifyExecPlan(
      "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col 
FROM t1")
  } 

// actual wrong plan
Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, 
VARCHAR(2147483647) b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b])

// expected correct plan
Calc(select=[a, b, CAST(ROW(a, b) AS 
RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) 
b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b]){code}
 

 

Now I have determined the cause of the problem and how to fix it, and I am 
adding some cases and will create a pr later. Due to the inconsistency of row 
types in Calcite and Flink, I cannot enumerate all possible error cases in the 
future. If there are queries with the same error in the future, anyone can link 
to this jira and I will solve it then.

> Type mismatch when using ROW in computed column
> -----------------------------------------------
>
>                 Key: FLINK-20539
>                 URL: https://issues.apache.org/jira/browse/FLINK-20539
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Timo Walther
>            Assignee: xuyang
>            Priority: Major
>              Labels: auto-unassigned, pull-request-available
>             Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>       "CREATE TABLE Orders (\n"
>       + "    order_number BIGINT,\n"
>       + "    price        INT,\n"
>       + "    first_name   STRING,\n"
>       + "    last_name    STRING,\n"
>       + "    buyer_name AS ROW(first_name, last_name)\n"
>       + ") WITH (\n"
>       + "  'connector' = 'datagen'\n"
>       + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT 
> NULL
> rel:
> LogicalProject(order_number=[$0], price=[$1], first_name=[$2], 
> last_name=[$3], buyer_name=[ROW($2, $3)])
>   LogicalTableScan(table=[[default_catalog, default_database, Orders]])
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to