Aitozi created FLINK-32320: ------------------------------ Summary: Same correlate can not be reused due to the different correlationId Key: FLINK-32320 URL: https://issues.apache.org/jira/browse/FLINK-32320 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi
As describe in SubplanReuserTest {code:java} @Test def testSubplanReuseOnCorrelate(): Unit = { util.addFunction("str_split", new StringSplit()) val sqlQuery = """ |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v)) |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v """.stripMargin // TODO the sub-plan of Correlate should be reused, // however the digests of Correlates are different util.verifyExecPlan(sqlQuery) } {code} This will produce the plan {code:java} HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, c0, f00], build=[right]) :- Exchange(distribution=[hash[f0]]) : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[f0]]) +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)