[ https://issues.apache.org/jira/browse/SPARK-51356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herman van Hövell updated SPARK-51356: -------------------------------------- Description: In whole stage code generation FilterExec reorders IsNotNull to make sure we only materialize fields when we have to. This unfortunately does not seem to work in cases where we access nested data. For example: {noformat} case class A(b: B = null) case class B(c: C = null) case class C(d: Int) val data = Seq( A(null), A(B(null)), // This causes the failure A(B(C(0))), A(B(C(1)))) val aValues = spark.createDataset(data).map(i => i) // Map is needed to avoid the local relation from being optimized away. val bValues = aValues.where(col("b").isNotNull).select(col("b").as[B]) val isDZero = udf((c: C) => c.d == 0) // Select all from b where c.d == 0 val result = bValues .filter(col("c").isNotNull) .filter(not(isDZero(col("c")))) // KABOOM! result.show(){noformat} This yields the following plan: {noformat} == Parsed Logical Plan == 'Filter '`!`(UDF('c)) +- Filter isnotnull(c#18) +- Project [b#11.c AS c#18] +- Project [b#11] +- Filter isnotnull(b#11) +- SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class $line15.$read$$iw$A, [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], obj#9: $line15.$read$$iw$A +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- LocalRelation [b#1] == Analyzed Logical Plan == c: struct<d:int> Filter NOT UDF(c#18) +- Filter isnotnull(c#18) +- Project [b#11.c AS c#18] +- Project [b#11] +- Filter isnotnull(b#11) +- SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class $line15.$read$$iw$A, [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], obj#9: $line15.$read$$iw$A +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- LocalRelation [b#1] == Optimized Logical Plan == Project [b#11.c AS c#18] +- Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c))) +- SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class $line15.$read$$iw$A, [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], obj#9: $line15.$read$$iw$A +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- LocalRelation [b#1] == Physical Plan == *(1) Project [b#11.c AS c#18] +- *(1) Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c))) +- *(1) SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- *(1) MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, obj#9: $line15.$read$$iw$A +- *(1) DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- *(1) LocalTableScan [b#1]{noformat} The optimizer has collapsed the filters and moved the project. Now we have a filter {{{}isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)){}}}. In WholeStageCodegen the second filter is moved after the UDF invocation. The problem with that is that we now pass a null into a UDF which it should be guarded from. In this case we don't even invoke the UDF because the encoder will fail (which is also a bug). The IsNotNull reordering (and this bug) has been there since we introduces whole stage code generation. This particular case triggers in Spark 4, and not in earlier versions because we improved typed select. Typed select used to use a map to flatten results if the typed column returned a struct, now we use star expansion (which is cheaper). This allows the optimizer to be more aggressive and fuse the two filters. Reverting that change would fix this particular scenario, however it is easy to trigger this in older versions by using {{{}val bValues = aValues.where(col("b").isNotNull).select(col("b.*")).as[B]{}}}. This problem can be mitigated by either disabling whole stage code generation ({{{}spark.conf.set("spark.sql.codegen.wholeStage", false){}}}), or by introducing a barrier for optimization (e.g. val {{{}bValues = aValues.where(col("b").isNotNull).map(_.b){}}}) was: In whole stage code generation FilterExec reorders IsNotNull to make sure we only materialize fields when we have to. This unfortunately does not seem to work in cases where we access nested data. For example: {noformat} case class A(b: B = null) case class B(c: C = null) case class C(d: Int) val data = Seq( A(null), A(B(null)), // This causes the failure A(B(C(0))), A(B(C(1)))) val aValues = spark.createDataset(data).map(i => i) // Map is needed to avoid the local relation from being optimized away. val bValues = aValues.where(col("b").isNotNull).select(col("b").as[B]) val isDZero = udf((c: C) => c.d == 0) // Select all from b where c.d == 0 val result = bValues .filter(col("c").isNotNull) .filter(not(isDZero(col("c")))) // KABOOM! result.show(){noformat} This yields the following plan: {noformat} == Parsed Logical Plan == 'Filter '`!`(UDF('c)) +- Filter isnotnull(c#18) +- Project [b#11.c AS c#18] +- Project [b#11] +- Filter isnotnull(b#11) +- SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class $line15.$read$$iw$A, [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], obj#9: $line15.$read$$iw$A +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- LocalRelation [b#1]== Analyzed Logical Plan == c: struct<d:int> Filter NOT UDF(c#18) +- Filter isnotnull(c#18) +- Project [b#11.c AS c#18] +- Project [b#11] +- Filter isnotnull(b#11) +- SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class $line15.$read$$iw$A, [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], obj#9: $line15.$read$$iw$A +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- LocalRelation [b#1]== Optimized Logical Plan == Project [b#11.c AS c#18] +- Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c))) +- SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class $line15.$read$$iw$A, [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], obj#9: $line15.$read$$iw$A +- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- LocalRelation [b#1]== Physical Plan == *(1) Project [b#11.c AS c#18] +- *(1) Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c))) +- *(1) SerializeFromObject [if (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b()))) null else named_struct(c, if (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] +- *(1) MapElements $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, obj#9: $line15.$read$$iw$A +- *(1) DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A +- *(1) LocalTableScan [b#1]{noformat} The optimizer has collapsed the filters and moved the project. Now we have a filter {{{}isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)){}}}. In WholeStageCodegen the second filter is moved after the UDF invocation. The problem with that is that we now pass a null into a UDF which it should be guarded from. In this case we don't even invoke the UDF because the encoder will fail (which is also a bug). The IsNotNull reordering (and this bug) has been there since we introduces whole stage code generation. This particular case triggers in Spark 4, and not in earlier versions because we improved typed select. Typed select used to use a map to flatten results if the typed column returned a struct, now we use star expansion (which is cheaper). This allows the optimizer to be more aggressive and fuse the two filters. Reverting that change would fix this particular scenario, however it is easy to trigger this in older versions by using {{{}val bValues = aValues.where(col("b").isNotNull).select(col("b.*")).as[B]{}}}. This problem can be mitigated by either disabling whole stage code generation ({{{}spark.conf.set("spark.sql.codegen.wholeStage", false){}}}), or by introducing a barrier for optimization (e.g. val {{{}bValues = aValues.where(col("b").isNotNull).map(_.b){}}}) > FilterExec incorrectly reorders IsNotNull predicates for nested access > ---------------------------------------------------------------------- > > Key: SPARK-51356 > URL: https://issues.apache.org/jira/browse/SPARK-51356 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 4.0.0, 3.5.5 > Reporter: Herman van Hövell > Priority: Major > > In whole stage code generation FilterExec reorders IsNotNull to make sure we > only materialize fields when we have to. This unfortunately does not seem to > work in cases where we access nested data. > For example: > > {noformat} > case class A(b: B = null) > case class B(c: C = null) > case class C(d: Int) > val data = Seq( > A(null), > A(B(null)), // This causes the failure > A(B(C(0))), > A(B(C(1)))) > val aValues = spark.createDataset(data).map(i => i) // Map is needed to avoid > the local relation from being optimized away. > val bValues = aValues.where(col("b").isNotNull).select(col("b").as[B]) > val isDZero = udf((c: C) => c.d == 0) > // Select all from b where c.d == 0 > val result = bValues > .filter(col("c").isNotNull) > .filter(not(isDZero(col("c")))) > // KABOOM! > result.show(){noformat} > > This yields the following plan: > {noformat} > == Parsed Logical Plan == > 'Filter '`!`(UDF('c)) > +- Filter isnotnull(c#18) > +- Project [b#11.c AS c#18] > +- Project [b#11] > +- Filter isnotnull(b#11) > +- SerializeFromObject [if > (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, > true])).b()))) null else named_struct(c, if > (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, > invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] > +- MapElements > $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class > $line15.$read$$iw$A, > [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], > obj#9: $line15.$read$$iw$A > +- DeserializeToObject newInstance(class > $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A > +- LocalRelation [b#1] > == Analyzed Logical Plan == > c: struct<d:int> > Filter NOT UDF(c#18) > +- Filter isnotnull(c#18) > +- Project [b#11.c AS c#18] > +- Project [b#11] > +- Filter isnotnull(b#11) > +- SerializeFromObject [if > (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, > true])).b()))) null else named_struct(c, if > (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, > invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] > +- MapElements > $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class > $line15.$read$$iw$A, > [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], > obj#9: $line15.$read$$iw$A > +- DeserializeToObject newInstance(class > $line15.$read$$iw$A), obj#7: $line15.$read$$iw$A > +- LocalRelation [b#1] > == Optimized Logical Plan == > Project [b#11.c AS c#18] > +- Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c))) > +- SerializeFromObject [if > (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, > true])).b()))) null else named_struct(c, if > (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, > invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] > +- MapElements > $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class > $line15.$read$$iw$A, > [StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)], > obj#9: $line15.$read$$iw$A > +- DeserializeToObject newInstance(class $line15.$read$$iw$A), > obj#7: $line15.$read$$iw$A > +- LocalRelation [b#1] > == Physical Plan == > *(1) Project [b#11.c AS c#18] > +- *(1) Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c))) > +- *(1) SerializeFromObject [if > (isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A, > true])).b()))) null else named_struct(c, if > (isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d, > invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0, > $line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11] > +- *(1) MapElements > $line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, obj#9: > $line15.$read$$iw$A > +- *(1) DeserializeToObject newInstance(class $line15.$read$$iw$A), > obj#7: $line15.$read$$iw$A > +- *(1) LocalTableScan [b#1]{noformat} > The optimizer has collapsed the filters and moved the project. Now we have a > filter {{{}isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)){}}}. > In WholeStageCodegen the second filter is moved after the UDF invocation. The > problem with that is that we now pass a null into a UDF which it should be > guarded from. In this case we don't even invoke the UDF because the encoder > will fail (which is also a bug). The IsNotNull reordering (and this bug) has > been there since we introduces whole stage code generation. > > This particular case triggers in Spark 4, and not in earlier versions because > we improved typed select. Typed select used to use a map to flatten results > if the typed column returned a struct, now we use star expansion (which is > cheaper). This allows the optimizer to be more aggressive and fuse the two > filters. Reverting that change would fix this particular scenario, however it > is easy to trigger this in older versions by using {{{}val bValues = > aValues.where(col("b").isNotNull).select(col("b.*")).as[B]{}}}. > > This problem can be mitigated by either disabling whole stage code generation > ({{{}spark.conf.set("spark.sql.codegen.wholeStage", false){}}}), or by > introducing a barrier for optimization (e.g. val {{{}bValues = > aValues.where(col("b").isNotNull).map(_.b){}}}) > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org