[ https://issues.apache.org/jira/browse/FLINK-25097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452907#comment-17452907 ]
Chu Xue commented on FLINK-25097: --------------------------------- [~jingzhang] I am sorry to reply you late for the reason was that I have something to do these day.First, I don't think RexInputRef should be Passed to columnIntervalOfSinglePredicate in theory.Then, a previous optimization rule may subtract the judgment condition(is true), resulting in a bug in the ValueInterval of joinkey when predicate push down.Next, considering that the evaluation result of the expression(is false) is null, return null directly when rexinputref is passed.If there are other conditions will be subtracted by some optimization rules, causing another RexInputRef is passed to columnIntervalOfSinglePredicate, there may be a problem.However, the boundary of the predicate is maximized after null is returned, and the SQL results are consistent.Finally, we should solve the bug of optimization rules. Above modifications is equivalent to a protective measure. > Bug in inner join when the filter condition is boolean type > ----------------------------------------------------------- > > Key: FLINK-25097 > URL: https://issues.apache.org/jira/browse/FLINK-25097 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2, 1.13.0, 1.14.0 > Reporter: Chu Xue > Priority: Major > Attachments: ColumnIntervalUtil.scala, ConditionFalseResult.txt, > ConditionTrueResult.txt, errorLog.txt > > > When I test the inner join, the column type of the filter condition is > Boolean, and there is an error in the SQL conversion process。 > The SQL as follow: > {code:java} > source-1: > "CREATE TABLE IF NOT EXISTS data_source (\n" + > " id INT,\n" + > " name STRING,\n" + > " sex boolean\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'rows-per-second'='1',\n" + > " 'fields.id.kind'='sequence',\n" + > " 'fields.id.start'='1',\n" + > " 'fields.id.end'='10',\n" + > " 'fields.name.kind'='random',\n" + > " 'fields.name.length'='10'\n" + > ")"; > source-2: > "CREATE TABLE IF NOT EXISTS info (\n" + > " id INT,\n" + > " name STRING,\n" + > " sex boolean\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'rows-per-second'='1',\n" + > " 'fields.id.kind'='sequence',\n" + > " 'fields.id.start'='1',\n" + > " 'fields.id.end'='10',\n" + > " 'fields.name.kind'='random',\n" + > " 'fields.name.length'='10'\n" + > ")"; > sink: > "CREATE TABLE IF NOT EXISTS print_sink ( \n" + > " id INT,\n" + > " name STRING,\n" + > " left_sex boolean,\n" + > " right_sex boolean\n" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"; > SQL-1: > "insert into print_sink" + > " select l.id, l.name, l.sex, r.sex from data_source l " + > "inner join info r on l.sex = r.sex where l.sex is true";{code} > The SQL fails with: > {code:java} > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Error while applying rule > FlinkLogicalCalcConverter(in:NONE,out:LOGICAL), args > [rel#135:LogicalCalc.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#115,expr#0..5={inputs},proj#0..2={exprs},3=$t5)] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.RuntimeException: Error while applying rule > FlinkLogicalCalcConverter(in:NONE,out:LOGICAL), args > [rel#135:LogicalCalc.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#115,expr#0..5={inputs},proj#0..2={exprs},3=$t5)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:625) > at com.xue.testSql.main(testSql.java:60) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) > ... 11 more > Caused by: java.lang.RuntimeException: Error occurred while applying rule > FlinkLogicalCalcConverter(in:NONE,out:LOGICAL) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:169) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) > ... 39 more > Caused by: java.lang.ClassCastException: org.apache.calcite.rex.RexInputRef > cannot be cast to org.apache.calcite.rex.RexCall > at > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$.org$apache$flink$table$planner$plan$utils$ColumnIntervalUtil$$columnIntervalOfSinglePredicate(ColumnIntervalUtil.scala:236) > at > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5$$anonfun$6.apply(ColumnIntervalUtil.scala:223) > at > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5$$anonfun$6.apply(ColumnIntervalUtil.scala:223) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5.apply(ColumnIntervalUtil.scala:223) > at > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5.apply(ColumnIntervalUtil.scala:221) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$.getColumnIntervalWithFilter(ColumnIntervalUtil.scala:221) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnIntervalOfCalc(FlinkRelMdColumnInterval.scala:227) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnInterval(FlinkRelMdColumnInterval.scala:203) > at > GeneratedMetadataHandler_ColumnInterval.getColumnInterval_$(Unknown Source) > at GeneratedMetadataHandler_ColumnInterval.getColumnInterval(Unknown > Source) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getColumnInterval(FlinkRelMetadataQuery.java:112) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnInterval(FlinkRelMdColumnInterval.scala:801) > at > GeneratedMetadataHandler_ColumnInterval.getColumnInterval_$(Unknown Source) > at GeneratedMetadataHandler_ColumnInterval.getColumnInterval(Unknown > Source) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getColumnInterval(FlinkRelMetadataQuery.java:112) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount$$anonfun$1.apply(FlinkRelMdRowCount.scala:308) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount$$anonfun$1.apply(FlinkRelMdRowCount.scala:306) > at > scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38) > at > scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:46) > at scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:48) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getEquiInnerJoinRowCount(FlinkRelMdRowCount.scala:306) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getRowCount(FlinkRelMdRowCount.scala:268) > at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) > at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:212) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getRowCount(FlinkRelMdRowCount.scala:410) > at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) > at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:212) > at > org.apache.calcite.rel.metadata.RelMdUtil.estimateFilteredRows(RelMdUtil.java:766) > at > org.apache.calcite.rel.metadata.RelMdUtil.estimateFilteredRows(RelMdUtil.java:761) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getRowCount(FlinkRelMdRowCount.scala:62) > at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) > at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:212) > at > org.apache.flink.table.planner.plan.nodes.common.CommonCalc.computeSelfCost(CommonCalc.scala:59) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41) > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown > Source) > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown > Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:288) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:705) > at > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:415) > at > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:398) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1268) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1227) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) > ... 43 more > {code} > I change the flink version,such as 1.12.2 ,1.13.3 and 1.14.0,this error > occur in all versions during executeSql。 > There is a little different in 1.12.2 between other version.The above errors > will be reported directly when explain SQL-1 in 1.12.2,but other version > explain SQL-1 successfully. > Then,I modify the SQL-1 .Change l.sex from true to false > {code:java} > SQL-2: > insert into print_sink select l.id, l.name, l.sex, r.sex from data_source l > inner join info r on l.sex = r.sex where l.sex is false{code} > The SQL-2 can run normally. > > I attempt to modify > org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil#columnIntervalOfSinglePredicate,like > this > {code:java} > private def columnIntervalOfSinglePredicate(condition: RexNode): > ValueInterval = { > //Add a judgment > if ( !condition.isInstanceOf[RexCall] ){ > return null > } > val convertedCondition = condition.asInstanceOf[RexCall] > ... > }{code} > Both SQL-1 and SQL-2 run normally.Result are [#ConditionTrueResult.txt] and > [#ConditionFalseResult.txt]. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)