lsyldliu commented on code in PR #23216: URL: https://github.com/apache/flink/pull/23216#discussion_r1299354403
########## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml: ########## @@ -430,6 +430,104 @@ MultipleInput(readOrder=[2,0,1], members=[\nHashJoin(joinType=[InnerJoin], where </Resource> </TestCase> + <TestCase name="testBuildSideIsJoinWithTwoAggInputs"> + <Resource name="ast"> + <![CDATA[ +LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], dim_date_sk=[$5], EXPR$1=[$6], dim_date_sk0=[$7], EXPR$10=[$8]) ++- LogicalJoin(condition=[=($4, $5)], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact]]) + +- LogicalProject(dim_date_sk=[$0], EXPR$1=[$1], dim_date_sk0=[$2], EXPR$10=[$3]) + +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + :- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)]) + : +- LogicalProject(dim_date_sk=[$4], price=[$3]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)]) + +- LogicalProject(dim_date_sk=[$4], amount=[$2]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right]) +:- Exchange(distribution=[hash[fact_date_sk]]) +: +- RuntimeFilter(select=[fact_date_sk], estimatedFilterRatio=[0.9998779296875]) +: :- Exchange(distribution=[broadcast]) +: : +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], estimatedRowCount=[131072], maxRowCount=[436907]) +: : +- Exchange(distribution=[single]) +: : +- LocalRuntimeFilterBuilder(select=[dim_date_sk], estimatedRowCount=[131072], maxRowCount=[436907]) +: : +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[dim_date_sk, amount], metadata=[]]], fields=[dim_date_sk, amount]) +: +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, amount, price, fact_date_sk]) ++- HashJoin(joinType=[InnerJoin], where=[=(dim_date_sk, dim_date_sk0)], select=[dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right]) + :- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, SUM(price) AS EXPR$1]) + : +- Exchange(distribution=[hash[dim_date_sk]]) + : +- Calc(select=[dim_date_sk, price]) + : +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[amount, price, dim_date_sk], metadata=[]]], fields=[amount, price, dim_date_sk]) + +- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, SUM(amount) AS EXPR$1]) + +- Exchange(distribution=[hash[dim_date_sk]]) + +- Calc(select=[dim_date_sk, amount]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[amount, price, dim_date_sk], metadata=[]]], fields=[amount, price, dim_date_sk]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])\n:- [#1] Exchange(distribution=[hash[fact_date_sk]])\n+- HashJoin(joinType=[InnerJoin], where=[(dim_date_sk = dim_date_sk0)], select=[dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])\n :- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, SUM(price) AS EXPR$1])\n : +- [#2] Exchange(distribution=[hash[dim_date_sk]])\n +- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, SUM(amount) AS EXPR$1])\n +- [#3] Exchange(distribution=[hash[dim_date_sk]])\n]) +:- Exchange(distribution=[hash[fact_date_sk]]) +: +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[fact_date_sk], estimatedFilterRatio=[0.9998779296875])\n:- [#1] Exchange(distribution=[broadcast])\n+- [#2] TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, amount, price, fact_date_sk])\n]) +: :- Exchange(distribution=[broadcast]) +: : +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], estimatedRowCount=[131072], maxRowCount=[436907]) +: : +- Exchange(distribution=[single]) +: : +- LocalRuntimeFilterBuilder(select=[dim_date_sk], estimatedRowCount=[131072], maxRowCount=[436907]) +: : +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[dim_date_sk, amount], metadata=[]]], fields=[dim_date_sk, amount]) Review Comment: From this plan, I found some problems about `BatchPhysicalLocalRuntimeFilterBuilder#copy`, `BatchPhysicalGlobalRuntimeFilterBuilder#copy` and `BatchPhysicalRuntimeFilter#copy` method implementation. We should override the `traitSet` and `inputs`, please refer to [1]. [1] https://github.com/apache/flink/blob/9546f8243a24e7b45582b6de6702f819f1d73f97/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java#L53 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org