Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x
branch, as well.

On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson <ever...@nuna.com> wrote:

> Went ahead and opened
>
> https://issues.apache.org/jira/browse/SPARK-19586
>
> though I'd generally expect to just close it as fixed in 2.1.0 and roll on.
>
> On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> On the plus side, looks like this may be fixed in 2.1.0:
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>       +- *Project
>>          +- *Filter NOT isnotnull(username#14)
>>             +- *FileScan parquet [username#14] Batched: true, Format:
>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>> ReadSchema: struct<username:string>
>>
>>
>>
>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com>
>> wrote:
>>
>>> Bumping this thread.
>>>
>>> Translating "where not(username is not null)" into a filter of  
>>> [IsNotNull(username),
>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>
>>> Spark 1.6.2:
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)],
>>> output=[_c0#1822L])
>>> +- TungstenExchange SinglePartition, None
>>>  +- TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)],
>>> output=[count#1825L])
>>>  +- Project
>>>  +- Filter NOT isnotnull(username#1590)
>>>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
>>> PushedFilters: [Not(IsNotNull(username))]
>>>
>>> Spark 2.0.2
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>  +- *Project
>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
>>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>> ReadSchema: struct<username:string>
>>>
>>> Example to generate the above:
>>>
>>> // Create some fake data
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>>     Row(1, "fred"),
>>>     Row(2, "amy"),
>>>     Row(3, null)))
>>>
>>> val schema = StructType(Seq(
>>>     StructField("id", IntegerType, nullable = true),
>>>     StructField("username", StringType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>> val path = "SOME PATH HERE"
>>>
>>> data.write.mode("overwrite").parquet(path)
>>>
>>> val testData = sqlContext.read.parquet(path)
>>>
>>> testData.registerTempTable("filter_test_table")
>>>
>>>
>>> %sql
>>> explain select count(*) from filter_test_table where not( username is
>>> not null)
>>>
>>>
>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>> akosti...@nuna.com.invalid> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an application where I’m filtering data with SparkSQL with
>>>> simple WHERE clauses. I also want the ability to show the unmatched rows
>>>> for any filter, and so am wrapping the previous clause in `NOT()` to get
>>>> the inverse. Example:
>>>>
>>>> Filter:  username is not null
>>>> Inverse filter:  NOT(username is not null)
>>>>
>>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>>> inverse filter always returns zero results. It looks like this is a problem
>>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>>> which would obviously result in zero matches. An example below:
>>>>
>>>> pyspark:
>>>> > x = spark.sql('select my_id from my_table where *username is not
>>>> null*')
>>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>>> null*)')
>>>>
>>>> > x.explain()
>>>> == Physical Plan ==
>>>> *Project [my_id#6L]
>>>> +- *Filter isnotnull(username#91)
>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>> [1159]> y.explain()
>>>> == Physical Plan ==
>>>> *Project [my_id#6L]
>>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>>> e
>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>        PartitionFilters: [],
>>>>        PushedFilters: [IsNotNull(username),
>>>> Not(IsNotNull(username))],username
>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>>
>>>> Presently I’m working around this by using the new functionality of NOT
>>>> EXISTS in Spark 2, but that seems like overkill.
>>>>
>>>> Any help appreciated.
>>>>
>>>>
>>>> *Alexi Kostibas*Engineering
>>>> *Nuna*
>>>> 650 Townsend Street, Suite 425
>>>> San Francisco, CA 94103
>>>>
>>>>
>>>
>>
>

Reply via email to