Alan Sheinberg created FLINK-36472: -------------------------------------- Summary: Correlates with UDTF don't handle right-side conditions or projections Key: FLINK-36472 URL: https://issues.apache.org/jira/browse/FLINK-36472 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.1 Reporter: Alan Sheinberg
Basic Table Function: {code:java} @FunctionHint(output = @DataTypeHint("ROW<s STRING >")) public static class Func extends TableFunction<Row> { public void eval( Integer i) { collect(Row.of("blah " + i)); collect(Row.of("foo " + i)); } }{code} Then a test case: {code:java} @Test public void testTableFuncWithRightCalcWithSelect() { Table t1 = tEnv.fromValues(1, 2).as("f1"); tEnv.createTemporaryView("t1", t1); tEnv.createTemporarySystemFunction("func", new Func()); TableResult result = tEnv.executeSql("select * FROM t1, LATERAL (SELECT CONCAT(foo, ' abc') " + "FROM TABLE(func(f1)) as T(foo))"); final List<Row> results = new ArrayList<>(); result.collect().forEachRemaining(results::add); final List<Row> expectedRows = Arrays.asList( Row.of(1, "blah 1 abc"), Row.of(1, "foo 1 abc"), Row.of(2, "blah 2 abc"), Row.of(2, "foo 2 abc")); assertThat(results).containsSequence(expectedRows); } {code} It has no condition and results in NPE: {code:java} ... Caused by: java.lang.NullPointerException at org.apache.calcite.rex.RexProgram.expandLocalRef(RexProgram.java:549) at org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.convertToCorrelate$1(StreamPhysicalCorrelateRule.scala:84) at org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.convert(StreamPhysicalCorrelateRule.scala:98) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:172){code} Even fixing it, it doesn't work. Looking in the code, it seems to just discard the projections. Running the test above, I get: {code:java} java.lang.AssertionError: Expecting actual: [+I[1, blah 1], +I[1, foo 1], +I[2, blah 2], +I[2, foo 2]] to contain sequence: [+I[1, blah 1 abc], +I[1, foo 1 abc], +I[2, blah 2 abc], +I[2, foo 2 abc]] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)