[ https://issues.apache.org/jira/browse/FLINK-33597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787944#comment-17787944 ]
Dawid Wysakowicz edited comment on FLINK-33597 at 11/20/23 11:00 AM: --------------------------------------------------------------------- I figured the problem is with nullability of a field of a nested row. When accessing a NOT NULL field of a NULLABLE ROW we add an additional CAST: e.g. CAST($0.customer_id AS INT) This behaviour is Flink specific (https://github.com/apache/flink/blob/0b4e32e86342a7b10d612157cefe1dc114801aa1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java#L56) However the CAST can be stripped by the RexSimplify#simplifyCast. This is done in case of a JOIN when pushing down the join condition. Unfortunately the result is when e.g. reordering rels, e.g pushing projection through watermark assigner, the types mismatch, because we’re recreating the nested field access with the correct non-simplified type. My suggestion would be to disable simplifying CAST for nested fields accesses https://github.com/confluentinc/flink/blob/c05eab6bd6e8932f9f1ca0b78b5d6c206e7ae04b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexSimplify.java#L2245 {code} private RexNode simplifyCast(RexCall e) { RexNode operand = e.getOperands().get(0); operand = simplify(operand, UNKNOWN); if (operand.isA(SqlKind.FIELD_ACCESS) && ((RexFieldAccess) operand) .getReferenceExpr() .getType() .isStruct()) { return rexBuilder.makeCast(e.getType(), operand, true); } if (sameTypeOrNarrowsNullability(e.getType(), operand.getType())) { return operand; } .... {code} was (Author: dawidwys): I figured the problem is with nullability of a field of a nested row. When accessing a NOT NULL field of a NULLABLE ROW we add an additional CAST: e.g. CAST($0.customer_id AS INT) This behaviour is Flink specific (https://github.com/confluentinc/flink/blob/600dfc77a301601f997fdf2c4f479a41b5e0ef2f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java#L56) However the CAST can be stripped by the RexSimplify#simplifyCast. This is done in case of a JOIN when pushing down the join condition. Unfortunately the result is when e.g. reordering rels, e.g pushing projection through watermark assigner, the types mismatch, because we’re recreating the nested field access with the correct non-simplified type. My suggestion would be to disable simplifying CAST for nested fields accesses https://github.com/confluentinc/flink/blob/c05eab6bd6e8932f9f1ca0b78b5d6c206e7ae04b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexSimplify.java#L2245 {code} private RexNode simplifyCast(RexCall e) { RexNode operand = e.getOperands().get(0); operand = simplify(operand, UNKNOWN); if (operand.isA(SqlKind.FIELD_ACCESS) && ((RexFieldAccess) operand) .getReferenceExpr() .getType() .isStruct()) { return rexBuilder.makeCast(e.getType(), operand, true); } if (sameTypeOrNarrowsNullability(e.getType(), operand.getType())) { return operand; } .... {code} > Can not use a nested column for a join condition > ------------------------------------------------ > > Key: FLINK-33597 > URL: https://issues.apache.org/jira/browse/FLINK-33597 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Priority: Major > Fix For: 1.19.0 > > > Query: > {code} > SELECT A.after.CUSTOMER_ID FROM `CUSTOMERS` A INNER JOIN `PRODUCTS` B ON > A.after.CUSTOMER_ID = B.after.PURCHASER; > {code} > fails with: > {code} > java.lang.RuntimeException: Error while applying rule > FlinkProjectWatermarkAssignerTransposeRule, args > [rel#411017:LogicalProject.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#411016,exprs=[$2, $2.CUSTOMER_ID]), > rel#411015:LogicalWatermarkAssigner.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#411014,rowtime=$rowtime,watermark=SOURCE_WATERMARK())] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250) > ... > Caused by: java.lang.IllegalArgumentException: Type mismatch: > rel rowtype: RecordType(RecordType:peek_no_expand(INTEGER NOT NULL > CUSTOMER_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CUSTOMER_NAME, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" TITLE, INTEGER DOB) after, > INTEGER $f8) NOT NULL > equiv rowtype: RecordType(RecordType:peek_no_expand(INTEGER NOT NULL > CUSTOMER_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CUSTOMER_NAME, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" TITLE, INTEGER DOB) after, > INTEGER NOT NULL $f8) NOT NULL > Difference: > $f8: INTEGER -> INTEGER NOT NULL > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) > ... 50 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)