[
https://issues.apache.org/jira/browse/SPARK-18014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Patterson updated SPARK-18014:
--------------------------------------
Environment: Pyspark 2.0.0, Ipython 4.2 (was: Pyspark 2.0.1, Ipython 4.2)
> Filters are incorrectly being grouped together when there is processing in
> between
> ----------------------------------------------------------------------------------
>
> Key: SPARK-18014
> URL: https://issues.apache.org/jira/browse/SPARK-18014
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.1
> Environment: Pyspark 2.0.0, Ipython 4.2
> Reporter: Michael Patterson
> Priority: Minor
>
> I created a dataframe that needed to filter the data on columnA, create a new
> columnB by applying a user defined function to columnA, and then filter on
> columnB. However, the two filters were being grouped together in the
> execution plan after the withColumn statement, which was causing errors due
> to unexpected input to the withColumn statement.
> Example code to reproduce:
> {code}
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> from functools import partial
> data = [{'input':0}, {'input':1}, {'input':2}]
> input_df = sc.parallelize(data).toDF()
> my_dict = {1:'first', 2:'second'}
> def apply_dict( input_dict, value):
> return input_dict[value]
> test_udf = F.udf( partial(apply_dict, my_dict ), T.StringType() )
> test_df = input_df.filter('input > 0').withColumn('output',
> test_udf('input')).filter(F.col('output').rlike('^s'))
> test_df.explain(True)
> {code}
> Execution plan:
> {code}
> == Analyzed Logical Plan ==
> input: bigint, output: string
> Filter output#4 RLIKE ^s
> +- Project [input#0L, partial(input#0L) AS output#4]
> +- Filter (input#0L > cast(0 as bigint))
> +- LogicalRDD [input#0L]
> == Optimized Logical Plan ==
> Project [input#0L, partial(input#0L) AS output#4]
> +- Filter ((isnotnull(input#0L) && (input#0L > 0)) && partial(input#0L) RLIKE
> ^s)
> +- LogicalRDD [input#0L]
> {code}
> Executing test_def.show() after the above code in pyspark 2.0.1 yields:
> KeyError: 0
> Executing test_def.show() in pyspark 1.6.2 yields:
> {code}
> +-----+------+
> |input|output|
> +-----+------+
> | 2|second|
> +-----+------+
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]