[ https://issues.apache.org/jira/browse/FLINK-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675073#comment-16675073 ]
ASF GitHub Bot commented on FLINK-10638: ---------------------------------------- pnowojski closed pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries URL: https://github.com/apache/flink/pull/6981 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 6a7a921bffb..99e9d7e6d01 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -449,9 +449,8 @@ abstract class BatchTableEnvironment( */ private[flink] def optimize(relNode: RelNode): RelNode = { val convSubQueryPlan = optimizeConvertSubQueries(relNode) - val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan) - val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan) - val decorPlan = RelDecorrelator.decorrelateQuery(fullNode) + val expandedPlan = optimizeExpandPlan(convSubQueryPlan) + val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan) val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan) val logicalPlan = optimizeLogicalPlan(normalizedPlan) optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 4973f34147f..8c6a1e0a04b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -804,13 +804,13 @@ abstract class StreamTableEnvironment( */ private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { val convSubQueryPlan = optimizeConvertSubQueries(relNode) - val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan) - val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan) - val decorPlan = RelDecorrelator.decorrelateQuery(fullNode) + val expandedPlan = optimizeExpandPlan(convSubQueryPlan) + val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan) val planWithMaterializedTimeAttributes = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes) val logicalPlan = optimizeLogicalPlan(normalizedPlan) + val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM) optimizeDecoratePlan(physicalPlan, updatesAsRetraction) } @@ -827,7 +827,7 @@ abstract class StreamTableEnvironment( } else { relNode } - runHepPlanner( + runHepPlannerSequentially( HepMatchOrder.BOTTOM_UP, decoRuleSet, planToDecorate, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 58831d10271..26f9e50fedd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList import org.apache.calcite.config.Lex import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.RelOptPlanner.CannotPlanException -import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} +import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgram, HepProgramBuilder} import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.schema.SchemaPlus @@ -256,34 +256,31 @@ abstract class TableEnvironment(val config: TableConfig) { protected def getBuiltInPhysicalOptRuleSet: RuleSet protected def optimizeConvertSubQueries(relNode: RelNode): RelNode = { - runHepPlanner( + runHepPlannerSequentially( HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet) } - protected def optimizeConvertToTemporalJoin(relNode: RelNode): RelNode = { - runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TEMPORAL_JOIN_RULES, + protected def optimizeExpandPlan(relNode: RelNode): RelNode = { + val result = runHepPlannerSimultaneously( + HepMatchOrder.TOP_DOWN, + FlinkRuleSets.EXPAND_PLAN_RULES, relNode, relNode.getTraitSet) - } - protected def optimizeConvertTableReferences(relNode: RelNode): RelNode = { - runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TABLE_REF_RULES, - relNode, - relNode.getTraitSet) + runHepPlannerSequentially( + HepMatchOrder.TOP_DOWN, + FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES, + result, + result.getTraitSet) } - protected def optimizeNormalizeLogicalPlan(relNode: RelNode): RelNode = { val normRuleSet = getNormRuleSet if (normRuleSet.iterator().hasNext) { - runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet) + runHepPlannerSequentially(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet) } else { relNode } @@ -310,13 +307,16 @@ abstract class TableEnvironment(val config: TableConfig) { } /** - * run HEP planner + * run HEP planner with rules applied one by one. First apply one rule to all of the nodes + * and only then apply the next rule. If a rule creates a new node preceding rules will not + * be applied to the newly created node. */ - protected def runHepPlanner( + protected def runHepPlannerSequentially( hepMatchOrder: HepMatchOrder, ruleSet: RuleSet, input: RelNode, targetTraits: RelTraitSet): RelNode = { + val builder = new HepProgramBuilder builder.addMatchOrder(hepMatchOrder) @@ -324,8 +324,36 @@ abstract class TableEnvironment(val config: TableConfig) { while (it.hasNext) { builder.addRuleInstance(it.next()) } + runHepPlanner(builder.build(), input, targetTraits) + } + + /** + * run HEP planner with rules applied simultaneously. Apply all of the rules to the given + * node before going to the next one. If a rule creates a new node all of the rules will + * be applied to this new node. + */ + protected def runHepPlannerSimultaneously( + hepMatchOrder: HepMatchOrder, + ruleSet: RuleSet, + input: RelNode, + targetTraits: RelTraitSet): RelNode = { + + val builder = new HepProgramBuilder + builder.addMatchOrder(hepMatchOrder) + + builder.addRuleCollection(ruleSet.asScala.toList.asJava) + runHepPlanner(builder.build(), input, targetTraits) + } + + /** + * run HEP planner + */ + protected def runHepPlanner( + hepProgram: HepProgram, + input: RelNode, + targetTraits: RelTraitSet): RelNode = { - val planner = new HepPlanner(builder.build, frameworkConfig.getContext) + val planner = new HepPlanner(hepProgram, frameworkConfig.getContext) planner.setRoot(input) if (input.getTraitSet != targetTraits) { planner.changeTraits(input, targetTraits.simplify) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 6e2ccdeba5b..5e0ee32ad6c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -39,18 +39,14 @@ object FlinkRuleSets { SubQueryRemoveRule.JOIN) /** - * Handles proper conversion of correlate queries with temporal table functions - * into temporal table joins. This can create new table scans in the plan. + * Expand plan by replacing references to tables into a proper plan sub trees. Those rules + * can create new plan nodes. */ - val TEMPORAL_JOIN_RULES: RuleSet = RuleSets.ofList( - LogicalCorrelateToTemporalTableJoinRule.INSTANCE - ) + val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList( + LogicalCorrelateToTemporalTableJoinRule.INSTANCE, + TableScanRule.INSTANCE) - /** - * Convert table references before query decorrelation. - */ - val TABLE_REF_RULES: RuleSet = RuleSets.ofList( - TableScanRule.INSTANCE, + val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList( EnumerableToLogicalTableScan.INSTANCE) val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala index 3c47f562aae..27c40bbbef2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala @@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase { val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)]( "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 'secondary_key) - val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency) + val rates = ratesHistory + .filter('rate > 110L) + .createTemporalTableFunction('rowtime, 'currency) util.addFunction("Rates", rates) val sqlQuery = diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala index f8d49238430..299c14417b6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala @@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase { val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)]( "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 'secondary_key) - val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency) + val rates = ratesHistory + .filter('rate > 110L) + .createTemporalTableFunction('rowtime, 'currency) util.addFunction("Rates", rates) val result = orders @@ -226,7 +228,8 @@ object TemporalTableJoinTest { unaryNode( "DataStreamCalc", streamTableNode(2), - term("select", "rowtime, currency, rate, secondary_key") + term("select", "rowtime, currency, rate, secondary_key"), + term("where", ">(rate, 110)") ), term("where", "AND(" + diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala index df0f01be0d5..0fb175370fb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala @@ -127,8 +127,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { var expectedOutput = new mutable.HashSet[String]() expectedOutput += (2 * 114).toString - expectedOutput += (1 * 102).toString - expectedOutput += (50 * 1).toString expectedOutput += (3 * 116).toString val orders = env @@ -142,11 +140,15 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { tEnv.registerTable("Orders", orders) tEnv.registerTable("RatesHistory", ratesHistory) + tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L)) tEnv.registerFunction( "Rates", - ratesHistory.createTemporalTableFunction('rowtime, 'currency)) + tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency)) + tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) - val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + // Scan from registered table to test for interplay between + // LogicalCorrelateToTemporalTableJoinRule and TableScanRule + val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid table scan resolution for temporal join queries > ------------------------------------------------------- > > Key: FLINK-10638 > URL: https://issues.apache.org/jira/browse/FLINK-10638 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Piotr Nowojski > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Registered tables that contain a temporal join are not properly resolved when > performing a table scan. > A planning error occurs when registering a table with a temporal join and > reading from it again. > {code} > LogicalProject(amount=[*($0, $4)]) > LogicalFilter(condition=[=($3, $1)]) > LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{2}]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > > LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) > NOT NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME > ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;]) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)