[ 
https://issues.apache.org/jira/browse/FLINK-38397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmad Humayun updated FLINK-38397:
----------------------------------
    Summary: Stale Sort Trait causes BatchPhysicalSort.explainTerms to read a 
non-existent index  (was: Runtime Exception in Planner)

> Stale Sort Trait causes BatchPhysicalSort.explainTerms to read a non-existent 
> index
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-38397
>                 URL: https://issues.apache.org/jira/browse/FLINK-38397
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.0
>         Environment: python 3.10.12
> apache-flink 2.0.0
>            Reporter: Ahmad Humayun
>            Priority: Major
>
>  
> The following results in the planner throwing a Runtime exception. My best 
> guess is that it seems to be an issue with the planner/optimizer incorrectly 
> estimating the number of columns available at a certain point.
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, Table
> from pyflink.common import Configuration
> from pyflink.table.expressions import col
>  
> cfg = Configuration()
>  
> settings = (
>   EnvironmentSettings.new_instance()
>   .in_batch_mode()
>   .with_configuration(cfg)
>   .build()
> )
>  
> table_env = TableEnvironment.create(settings)
>  
> # =============
> data = [
>    (1, "AAAAAAAABAAAAAAA", "Jimmy Allen", "3rd", -5.00),
>    (2, "AAAAAAAACAAAAAAA", "Jimmy Bullock", "Cedar Spruce", -5.00),
>    (3, "AAAAAAAACAAAAAAA", "Floyd Christian", "8th", -5.00),
>    (4, "AAAAAAAAEAAAAAAA", "James Lachance", "6th", -5.00),
>    (5, "AAAAAAAAEAAAAAAA", "James Lachance", "9th 12th", -5.00),
>    (6, "AAAAAAAAEAAAAAAA", "Joaquin Washington", "Adams", -5.00),
>    (7, "AAAAAAAAHAAAAAAA", "Michael Burton", "3rd", -5.00)
> ]
>  
> schema = [
>    "s_store_sk",
>    "s_store_id",
>    "s_manager",
>    "s_street_name",
>    "s_gmt_offset"
> ]
> # =====================
>  
> source_table = table_env.from_elements(
>   data,
>   schema=schema
> )
>  
> ordered = source_table.order_by(col('s_manager'))
> aggregated = 
> ordered.group_by(col('s_street_name')).select(col('s_gmt_offset').count.alias('s_gmt_offset'))
> print(aggregated.explain())
> {code}
> This code throws the following error:
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o70.explain.: 
> java.lang.RuntimeException: Error while applying rule 
> FlinkExpandConversionRule, args 
> [rel#219:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[2](input=RelSubset#217,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[2]),
>  
> rel#216:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[2](input=RelSubset#215,groupBy=s_street_name,select=s_street_name,
>  Partial_COUNT(s_gmt_offset) AS count$0)]        at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
>         at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
>         at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
>         at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)       
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>         at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)    
>     at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)    
>     at scala.collection.Iterator.foreach(Iterator.scala:943)        at 
> scala.collection.Iterator.foreach$(Iterator.scala:943)        at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)        at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74)        at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)        at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)        at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)        
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)      
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)     
>    at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>         at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
>         at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
>         at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
>         at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
>         at scala.collection.immutable.List.foreach(List.scala:431)        at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>         at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
>         at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
>         at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
>         at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
>         at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)     
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)        at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:569)        
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)      
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Thread.java:840)Caused by: 
> java.lang.RuntimeException: Error occurred while applying rule 
> FlinkExpandConversionRule        at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
>         at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:273)   
>      at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:288)   
>      at 
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
>         at 
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
>         at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
>         ... 41 moreCaused by: java.lang.ArrayIndexOutOfBoundsException: Index 
> 2 out of bounds for length 2        at 
> org.apache.flink.calcite.shaded.com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:77)
>         at org.apache.calcite.util.Util$TransformingList.get(Util.java:2794)  
>       at 
> scala.collection.convert.Wrappers$JListWrapper.apply(Wrappers.scala:100)      
>   at 
> org.apache.flink.table.planner.plan.utils.RelExplainUtil$.$anonfun$collationToString$1(RelExplainUtil.scala:83)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)    
>     at scala.collection.Iterator.foreach(Iterator.scala:943)        at 
> scala.collection.Iterator.foreach$(Iterator.scala:943)        at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)        at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74)        at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)        at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)        at 
> scala.collection.TraversableLike.map(TraversableLike.scala:286)        at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:279)        at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108)        at 
> org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:83)
>         at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
>         at 
> org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:414)
>         at 
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:396) 
>        at 
> org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:448)
>         at java.base/java.util.HashMap.hash(HashMap.java:338)        at 
> java.base/java.util.HashMap.getNode(HashMap.java:568)        at 
> java.base/java.util.HashMap.get(HashMap.java:556)        at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1289)
>         at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
>         at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>         at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
>         ... 46 more// code placeholder
> {code}
> However, if I just remove the columns that are irrelevant to the query, it 
> works without issues i.e., just replace the data and schema with the 
> following:
>  
> {code:java}
> data = [
>    ("Jimmy Allen", "3rd", -5.00),
>    ("Jimmy Bullock", "Cedar Spruce", -5.00),
>    ("Floyd Christian", "8th", -5.00),
>    ("James Lachance", "6th", -5.00),
>    ("James Lachance", "9th 12th", -5.00),
>    ("Joaquin Washington", "Adams", -5.00),
>    ("Michael Burton", "3rd", -5.00)
> ] 
> schema = [
>    "s_manager",
>    "s_street_name",
>    "s_gmt_offset"
> ]
> {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to