[
https://issues.apache.org/jira/browse/SPARK-18014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Patterson updated SPARK-18014:
--------------------------------------
Description:
I created a dataframe that needed to filter the data on columnA, create a new
columnB by applying a user defined function to columnA, and then filter on
columnB. However, the two filters were being grouped together in the execution
plan after the withColumn statement, which was causing errors due to unexpected
input to the withColumn statement.
Example code to reproduce:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from functools import partial
data = [{'input':0}, {'input':1}, {'input':2}]
input_df = sc.parallelize(data).toDF()
my_dict = {1:'first', 2:'second'}
def apply_dict( input_dict, value):
return input_dict[value]
test_udf = F.udf( partial(apply_dict, my_dict ), T.StringType() )
test_df = input_df.filter('input > 0').withColumn('output',
test_udf('input')).filter(F.col('output').rlike('^s'))
test_df.explain(True)
Execution plan:
== Analyzed Logical Plan ==
input: bigint, output: string
Filter output#4 RLIKE ^s
+- Project [input#0L, partial(input#0L) AS output#4]
+- Filter (input#0L > cast(0 as bigint))
+- LogicalRDD [input#0L]
== Optimized Logical Plan ==
Project [input#0L, partial(input#0L) AS output#4]
+- Filter ((isnotnull(input#0L) && (input#0L > 0)) && partial(input#0L) RLIKE
^s)
+- LogicalRDD [input#0L]
Executing test_def.show() after the above code in pyspark 2.0.1 yields:
KeyError: 0
Executing test_def.show() in pyspark 1.6.2 yields
{{+-----+------+}}
|input|output|
+-----+------+
| 2|second|
+-----+------+
was:
I created a dataframe that needed to filter the data on columnA, create a new
columnB by applying a user defined function to columnA, and then filter on
columnB. However, the two filters were being grouped together in the execution
plan after the withColumn statement, which was causing errors due to unexpected
input to the withColumn statement.
Example code to reproduce:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from functools import partial
data = [{'input':0}, {'input':1}, {'input':2}]
input_df = sc.parallelize(data).toDF()
my_dict = {1:'first', 2:'second'}
def apply_dict( input_dict, value):
return input_dict[value]
test_udf = F.udf( partial(apply_dict, my_dict ), T.StringType() )
test_df = input_df.filter('input > 0').withColumn('output',
test_udf('input')).filter(F.col('output').rlike('^s'))
test_df.explain(True)
Execution plan:
== Analyzed Logical Plan ==
input: bigint, output: string
Filter output#4 RLIKE ^s
+- Project [input#0L, partial(input#0L) AS output#4]
+- Filter (input#0L > cast(0 as bigint))
+- LogicalRDD [input#0L]
== Optimized Logical Plan ==
Project [input#0L, partial(input#0L) AS output#4]
+- Filter ((isnotnull(input#0L) && (input#0L > 0)) && partial(input#0L) RLIKE
^s)
+- LogicalRDD [input#0L]
Executing test_def.show() after the above code in pyspark 2.0.1 yields:
KeyError: 0
Executing test_def.show() in pyspark 1.6.2 yields
+-----+------+
|input|output|
+-----+------+
| 2|second|
+-----+------+
> Filters are incorrectly being grouped together when there is processing in
> between
> ----------------------------------------------------------------------------------
>
> Key: SPARK-18014
> URL: https://issues.apache.org/jira/browse/SPARK-18014
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.1
> Environment: Pyspark 2.0.1, Ipython 4.2
> Reporter: Michael Patterson
> Priority: Minor
>
> I created a dataframe that needed to filter the data on columnA, create a new
> columnB by applying a user defined function to columnA, and then filter on
> columnB. However, the two filters were being grouped together in the
> execution plan after the withColumn statement, which was causing errors due
> to unexpected input to the withColumn statement.
> Example code to reproduce:
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> from functools import partial
> data = [{'input':0}, {'input':1}, {'input':2}]
> input_df = sc.parallelize(data).toDF()
> my_dict = {1:'first', 2:'second'}
> def apply_dict( input_dict, value):
> return input_dict[value]
> test_udf = F.udf( partial(apply_dict, my_dict ), T.StringType() )
> test_df = input_df.filter('input > 0').withColumn('output',
> test_udf('input')).filter(F.col('output').rlike('^s'))
> test_df.explain(True)
> Execution plan:
> == Analyzed Logical Plan ==
> input: bigint, output: string
> Filter output#4 RLIKE ^s
> +- Project [input#0L, partial(input#0L) AS output#4]
> +- Filter (input#0L > cast(0 as bigint))
> +- LogicalRDD [input#0L]
> == Optimized Logical Plan ==
> Project [input#0L, partial(input#0L) AS output#4]
> +- Filter ((isnotnull(input#0L) && (input#0L > 0)) && partial(input#0L) RLIKE
> ^s)
> +- LogicalRDD [input#0L]
> Executing test_def.show() after the above code in pyspark 2.0.1 yields:
> KeyError: 0
> Executing test_def.show() in pyspark 1.6.2 yields
> {{+-----+------+}}
> |input|output|
> +-----+------+
> | 2|second|
> +-----+------+
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]