Timo Walther created FLINK-37211:
------------------------------------

             Summary: Fix field resolution for PARTITION BY/ORDER BY
                 Key: FLINK-37211
                 URL: https://issues.apache.org/jira/browse/FLINK-37211
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / Planner
            Reporter: Timo Walther


Currently, there is an issue in the SqlNode to RelNode conversion around 
resolving the PARTITION BY/ORDER BY.

Steps to reproduce:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java#L119

just change it to
{code}
util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY score, i => 1)");
{code}

Some insights from [~dwysakowicz]:

I think what you see is actually not a problem with scoping.
Actually the scope of SqlRexContext cx is correct in private RexNode 
convertTableArgs(SqlRexContext cx, final SqlCall call) {
You convert the entire table function and the cx context has that function's 
scope.
I think the logic you have in FlinkConvertletTable better fits into 
SqlToRelConverter which can resolve the operands in their own scope. Similarly 
as sub queries.

I think what you see is actually not a problem with scoping.
Actually the scope of SqlRexContext cx is correct in private RexNode 
convertTableArgs(SqlRexContext cx, final SqlCall call) {
You convert the entire table function and the cx context has that function's 
scope.
I think the logic you have in FlinkConvertletTable better fits into 
SqlToRelConverter: 
https://github.com/apache/flink/blob/55de8d683cc56733c03c11f927f511b2ba712851/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java#L5394
 which can resolve the operands in their own scope. Similarly as sub queries.

That's how I managed to compute correct partition keys, but it now fails while 
evaluating i => 1: SqlToRelConverter:

{code}
  /**
     * Converts a non-standard expression.
     *
     * <p>This method is an extension-point that derived classes can override. 
If this method
     * returns a null result, the normal expression translation process will 
proceed. The default
     * implementation always returns null.
     *
     * @param node Expression
     * @param bb Blackboard
     * @return null to proceed with the usual expression translation process
     */
    protected @Nullable RexNode convertExtendedExpression(SqlNode node, 
Blackboard bb) {
        if (!(node instanceof SqlCall)) {
            return null;
        }
        final SqlCall call = (SqlCall) node;
        final SqlOperator operator = call.getOperator();
        if (operator instanceof SqlTableFunction) {
            final RelDataType returnType = validator.getValidatedNodeType(node);

            final List<RexNode> rewrittenOperands = new ArrayList<>();
            int tableInputCount = 0;
            for (int pos = 0; pos < call.getOperandList().size(); pos++) {
                final SqlNode operand = call.operand(pos);
                if (operand.getKind() == SqlKind.ARGUMENT_ASSIGNMENT
                        && ((SqlCall) operand).operand(0).getKind()
                        == SqlKind.SET_SEMANTICS_TABLE) {
                    final SqlNode query = ((SqlCall) operand).operand(0);
                    rewrittenOperands.add(convertSetSemanticsArg(
                            (SqlCall) query,
                            tableInputCount++));
                } else if (operand.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
                    rewrittenOperands.add(convertSetSemanticsArg(
                            (SqlCall) operand,
                            tableInputCount++));
                } else if (operand.isA(SqlKind.QUERY)) {
                    rewrittenOperands.add(
                            new RexTableArgCall(
                                    validator.getValidatedNodeType(operand),
                                    tableInputCount++,
                                    new int[0],
                                    new int[0]));
                } else {
                    rewrittenOperands.add(this.convertExpression(operand));
                }
            }

            return rexBuilder.makeCall(returnType, operator, rewrittenOperands);
        }
        return null;
    }

    private RexNode convertSetSemanticsArg(SqlCall ptf, int tableInputCount) {
        final SqlSelect query = ptf.operand(0);
        final SqlNodeList partitionKeys = ptf.operand(1);
        final SqlNodeList orderKeys = ptf.operand(2);
        Preconditions.checkArgument(
                orderKeys.isEmpty(), "Table functions do not support order keys 
yet.");
        final SqlValidatorScope queryScope = validator.getSelectScope(query);
        final Blackboard bb = createBlackboard(queryScope, null, false);
        final int[] partitionKeyIndices = getPartitionKeyIndices(bb, 
partitionKeys);
        return new RexTableArgCall(
                validator.getValidatedNodeType(ptf),
                tableInputCount,
                partitionKeyIndices,
                new int[0]);
    }

    private static int[] getPartitionKeyIndices(SqlRexContext cx, SqlNodeList 
partitions) {
        final int[] result = new int[partitions.size()];
        for (int i = 0; i < partitions.getList().size(); i++) {
            final RexNode expr = cx.convertExpression(partitions.get(i));
            result[i] = parseFieldIdx(expr);
        }
        return result;
    }
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to