[ https://issues.apache.org/jira/browse/FLINK-35804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee resolved FLINK-35804. --------------------------------- Assignee: roncenzhao Resolution: Fixed fixed in master: a021082fa062e364e1c296a8ac33315b91c47cdd > Incorrect calc merge generate wrong plan about udtf+join+udf > ------------------------------------------------------------ > > Key: FLINK-35804 > URL: https://issues.apache.org/jira/browse/FLINK-35804 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.17.2, 1.18.1, 1.19.1 > Reporter: roncenzhao > Assignee: roncenzhao > Priority: Major > Labels: pull-request-available > > Like the same issue in FLINK-30841. > Take one test as example: > {code:java} > @Test > def testCalcMergeWithNonDeterministicExpr3(): Unit = > { > val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE > (length_udtf(c)) AS T(len)" > val sqlView1Query = "SELECT a, b, len " + s"FROM ($sqlUdtfQuery) t JOIN > MyTable_Join t2 " + "ON t.a = t2.d" > val view1 = util.tableEnv.sqlQuery(sqlView1Query) > util.tableEnv.createTemporaryView("View1", view1) > val sqlView2Query = "SELECT random_udf(b) AS r FROM View1" > val view2 = util.tableEnv.sqlQuery(sqlView2Query) > util.tableEnv.createTemporaryView("View2", view2) > val sqlQuery = "SELECT r FROM View2 WHERE r > 10" > util.verifyRelPlan(sqlQuery) > } > {code} > optimized plan will be wrong: > {code:java} > Calc(select=[random_udf(b) AS r]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, b], where=[>(random_udf(b), 10)]) > : +- Correlate(invocation=[length_udtf($cor0.c)], > correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], > rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER > EXPR$0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, > MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) > +- Calc(select=[d]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, > MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code} > the expected plan is: > {code:java} > Calc(select=[r], where=[>(r, 10)]) > +- Calc(select=[random_udf(b) AS r]) > +-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > :+-Calc(select=[a, b]) > :+-Correlate(invocation=[length_udtf($cor0.c)], > correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], > rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER > EXPR$0)], joinType=[INNER]) > :+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +-Exchange(distribution=[hash[d]]) > +-Calc(select=[d]) > +-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, > source: [TestTableSource(d, e, f)]]], fields=[d, e, f]){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)