Timo Walther created FLINK-37211: ------------------------------------ Summary: Fix field resolution for PARTITION BY/ORDER BY Key: FLINK-37211 URL: https://issues.apache.org/jira/browse/FLINK-37211 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Timo Walther
Currently, there is an issue in the SqlNode to RelNode conversion around resolving the PARTITION BY/ORDER BY. Steps to reproduce: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java#L119 just change it to {code} util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY score, i => 1)"); {code} Some insights from [~dwysakowicz]: I think what you see is actually not a problem with scoping. Actually the scope of SqlRexContext cx is correct in private RexNode convertTableArgs(SqlRexContext cx, final SqlCall call) { You convert the entire table function and the cx context has that function's scope. I think the logic you have in FlinkConvertletTable better fits into SqlToRelConverter which can resolve the operands in their own scope. Similarly as sub queries. I think what you see is actually not a problem with scoping. Actually the scope of SqlRexContext cx is correct in private RexNode convertTableArgs(SqlRexContext cx, final SqlCall call) { You convert the entire table function and the cx context has that function's scope. I think the logic you have in FlinkConvertletTable better fits into SqlToRelConverter: https://github.com/apache/flink/blob/55de8d683cc56733c03c11f927f511b2ba712851/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java#L5394 which can resolve the operands in their own scope. Similarly as sub queries. That's how I managed to compute correct partition keys, but it now fails while evaluating i => 1: SqlToRelConverter: {code} /** * Converts a non-standard expression. * * <p>This method is an extension-point that derived classes can override. If this method * returns a null result, the normal expression translation process will proceed. The default * implementation always returns null. * * @param node Expression * @param bb Blackboard * @return null to proceed with the usual expression translation process */ protected @Nullable RexNode convertExtendedExpression(SqlNode node, Blackboard bb) { if (!(node instanceof SqlCall)) { return null; } final SqlCall call = (SqlCall) node; final SqlOperator operator = call.getOperator(); if (operator instanceof SqlTableFunction) { final RelDataType returnType = validator.getValidatedNodeType(node); final List<RexNode> rewrittenOperands = new ArrayList<>(); int tableInputCount = 0; for (int pos = 0; pos < call.getOperandList().size(); pos++) { final SqlNode operand = call.operand(pos); if (operand.getKind() == SqlKind.ARGUMENT_ASSIGNMENT && ((SqlCall) operand).operand(0).getKind() == SqlKind.SET_SEMANTICS_TABLE) { final SqlNode query = ((SqlCall) operand).operand(0); rewrittenOperands.add(convertSetSemanticsArg( (SqlCall) query, tableInputCount++)); } else if (operand.getKind() == SqlKind.SET_SEMANTICS_TABLE) { rewrittenOperands.add(convertSetSemanticsArg( (SqlCall) operand, tableInputCount++)); } else if (operand.isA(SqlKind.QUERY)) { rewrittenOperands.add( new RexTableArgCall( validator.getValidatedNodeType(operand), tableInputCount++, new int[0], new int[0])); } else { rewrittenOperands.add(this.convertExpression(operand)); } } return rexBuilder.makeCall(returnType, operator, rewrittenOperands); } return null; } private RexNode convertSetSemanticsArg(SqlCall ptf, int tableInputCount) { final SqlSelect query = ptf.operand(0); final SqlNodeList partitionKeys = ptf.operand(1); final SqlNodeList orderKeys = ptf.operand(2); Preconditions.checkArgument( orderKeys.isEmpty(), "Table functions do not support order keys yet."); final SqlValidatorScope queryScope = validator.getSelectScope(query); final Blackboard bb = createBlackboard(queryScope, null, false); final int[] partitionKeyIndices = getPartitionKeyIndices(bb, partitionKeys); return new RexTableArgCall( validator.getValidatedNodeType(ptf), tableInputCount, partitionKeyIndices, new int[0]); } private static int[] getPartitionKeyIndices(SqlRexContext cx, SqlNodeList partitions) { final int[] result = new int[partitions.size()]; for (int i = 0; i < partitions.getList().size(); i++) { final RexNode expr = cx.convertExpression(partitions.get(i)); result[i] = parseFieldIdx(expr); } return result; } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)