Bumping this thread.

Translating "where not(username is not null)" into a filter of
[IsNotNull(username),
Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format:
ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct<username:string>

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))

val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not
null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <akosti...@nuna.com.invalid>
wrote:

> Hi,
>
> I have an application where I’m filtering data with SparkSQL with simple
> WHERE clauses. I also want the ability to show the unmatched rows for any
> filter, and so am wrapping the previous clause in `NOT()` to get the
> inverse. Example:
>
> Filter:  username is not null
> Inverse filter:  NOT(username is not null)
>
> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse
> filter always returns zero results. It looks like this is a problem with
> how the filter is getting pushed down to Parquet. Specifically, the
> pushdown includes both the “is not null” filter, AND “not(is not null)”,
> which would obviously result in zero matches. An example below:
>
> pyspark:
> > x = spark.sql('select my_id from my_table where *username is not null*')
> > y = spark.sql('select my_id from my_table where not(*username is not
> null*)')
>
> > x.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter isnotnull(username#91)
>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>        ReadSchema: struct<my_id:bigint,username:string>
> [1159]> y.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>        PartitionFilters: [],
>        PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> username
>        ReadSchema: struct<my_id:bigint,username:string>
>
> Presently I’m working around this by using the new functionality of NOT
> EXISTS in Spark 2, but that seems like overkill.
>
> Any help appreciated.
>
>
> *Alexi Kostibas*Engineering
> *Nuna*
> 650 Townsend Street, Suite 425
> San Francisco, CA 94103
>
>

Reply via email to