Herman van Hövell created SPARK-51356:
-----------------------------------------

             Summary: FilterExec incorrectly reorders IsNotNull predicates for 
nested access
                 Key: SPARK-51356
                 URL: https://issues.apache.org/jira/browse/SPARK-51356
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.5, 4.0.0
            Reporter: Herman van Hövell


In whole stage code generation FilterExec reorders IsNotNull to make sure we 
only materialize fields when we have to. This unfortunately does not seem to 
work in cases where we access nested data.

For example:

 
{noformat}
case class A(b: B = null)
case class B(c: C = null)
case class C(d: Int)

val data = Seq(
    A(null),
    A(B(null)), // This causes the failure
    A(B(C(0))),
    A(B(C(1))))
val aValues = spark.createDataset(data).map(i => i) // Map is needed to avoid 
the local relation from being optimized away.
val bValues = aValues.where(col("b").isNotNull).select(col("b").as[B])
val isDZero = udf((c: C) => c.d == 0)

// Select all from b where c.d == 0
val result = bValues
  .filter(col("c").isNotNull)
  .filter(not(isDZero(col("c"))))

// KABOOM!
result.show(){noformat}
 
This yields the following plan:
{noformat}
== Parsed Logical Plan ==
'Filter '`!`(UDF('c))
+- Filter isnotnull(c#18)
   +- Project [b#11.c AS c#18]
      +- Project [b#11]
         +- Filter isnotnull(b#11)
            +- SerializeFromObject [if 
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, 
true])).b()))) null else named_struct(c, if 
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, 
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, 
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
 $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
               +- MapElements 
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class 
$line15.$read$$iw$A, 
[StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)],
 obj#9: $line15.$read$$iw$A
                  +- DeserializeToObject newInstance(class 
$line15.$read$$iw$A), obj#7: $line15.$read$$iw$A
                     +- LocalRelation [b#1]== Analyzed Logical Plan ==
c: struct<d:int>
Filter NOT UDF(c#18)
+- Filter isnotnull(c#18)
   +- Project [b#11.c AS c#18]
      +- Project [b#11]
         +- Filter isnotnull(b#11)
            +- SerializeFromObject [if 
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, 
true])).b()))) null else named_struct(c, if 
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, 
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, 
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
 $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
               +- MapElements 
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class 
$line15.$read$$iw$A, 
[StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)],
 obj#9: $line15.$read$$iw$A
                  +- DeserializeToObject newInstance(class 
$line15.$read$$iw$A), obj#7: $line15.$read$$iw$A
                     +- LocalRelation [b#1]== Optimized Logical Plan ==
Project [b#11.c AS c#18]
+- Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)))
   +- SerializeFromObject [if 
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, 
true])).b()))) null else named_struct(c, if 
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, 
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, 
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
 $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
      +- MapElements 
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class 
$line15.$read$$iw$A, 
[StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)],
 obj#9: $line15.$read$$iw$A
         +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: 
$line15.$read$$iw$A
            +- LocalRelation [b#1]== Physical Plan ==
*(1) Project [b#11.c AS c#18]
+- *(1) Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)))
   +- *(1) SerializeFromObject [if 
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, 
true])).b()))) null else named_struct(c, if 
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, 
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, 
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
 $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
      +- *(1) MapElements 
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, obj#9: 
$line15.$read$$iw$A
         +- *(1) DeserializeToObject newInstance(class $line15.$read$$iw$A), 
obj#7: $line15.$read$$iw$A
            +- *(1) LocalTableScan [b#1]{noformat}
The optimizer has collapsed the filters and moved the project. Now we have a 
filter {{{}isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)){}}}. In 
WholeStageCodegen the second filter is moved after the UDF invocation. The 
problem with that is that we now pass a null into a UDF which it should be 
guarded from. In this case we don't even invoke the UDF because the encoder 
will fail (which is also a bug). The IsNotNull reordering (and this bug) has 
been there since we introduces whole stage code generation.
 
This particular case triggers in Spark 4, and not in earlier versions because 
we improved typed select. Typed select used to use a map to flatten results if 
the typed column returned a struct, now we use star expansion (which is 
cheaper). This allows the optimizer to be more aggressive and fuse the two 
filters. Reverting that change would fix this particular scenario, however it 
is easy to trigger this in older versions by using {{{}val bValues = 
aValues.where(col("b").isNotNull).select(col("b.*")).as[B]{}}}.
 
This problem can be mitigated by either disabling whole stage code generation 
({{{}spark.conf.set("spark.sql.codegen.wholeStage", false){}}}), or by 
introducing a barrier for optimization (e.g. val {{{}bValues = 
aValues.where(col("b").isNotNull).map(_.b){}}})
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to