Alan Sheinberg created FLINK-36472:
--------------------------------------

             Summary: Correlates with UDTF don't handle right-side conditions 
or projections
                 Key: FLINK-36472
                 URL: https://issues.apache.org/jira/browse/FLINK-36472
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.19.1
            Reporter: Alan Sheinberg


Basic Table Function:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW<s STRING >"))
public static class Func extends TableFunction<Row> {

    public void eval( Integer i) {
        collect(Row.of("blah " + i));
        collect(Row.of("foo " + i));
    }
}{code}
Then a test case:
{code:java}
@Test
public void testTableFuncWithRightCalcWithSelect() {
    Table t1 = tEnv.fromValues(1, 2).as("f1");
    tEnv.createTemporaryView("t1", t1);
    tEnv.createTemporarySystemFunction("func", new Func());
    TableResult result = tEnv.executeSql("select * FROM t1, LATERAL (SELECT 
CONCAT(foo, ' abc') "
            + "FROM TABLE(func(f1)) as T(foo))");
    final List<Row> results = new ArrayList<>();
    result.collect().forEachRemaining(results::add);
    final List<Row> expectedRows =
            Arrays.asList(
                    Row.of(1, "blah 1 abc"),
                    Row.of(1, "foo 1 abc"),
                    Row.of(2, "blah 2 abc"),
                    Row.of(2, "foo 2 abc"));
    assertThat(results).containsSequence(expectedRows);
} {code}
It has no condition and results in NPE:
{code:java}
...
Caused by: java.lang.NullPointerException
    at org.apache.calcite.rex.RexProgram.expandLocalRef(RexProgram.java:549)
    at 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.convertToCorrelate$1(StreamPhysicalCorrelateRule.scala:84)
    at 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.convert(StreamPhysicalCorrelateRule.scala:98)
    at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:172){code}
Even fixing it, it doesn't work. Looking in the code, it seems to just discard 
the projections.


Running the test above, I get:
{code:java}
java.lang.AssertionError: 
Expecting actual:
  [+I[1, blah 1], +I[1, foo 1], +I[2, blah 2], +I[2, foo 2]]
to contain sequence:
  [+I[1, blah 1 abc], +I[1, foo 1 abc], +I[2, blah 2 abc], +I[2, foo 2 abc]] 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to