I have some PySpark code like below. Basically, I persist a DataFrame (which is
time-consuming to compute) to disk, call the method DataFrame.count to trigger
the caching/persist immediately, and then I coalesce the DataFrame to reduce
the number of partitions (the original DataFrame has 30,000 partitions) and
output it to HDFS. Based on the execution time of job stages and the execution
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does
anyone know why this happens?
df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)
BTW, it works well if I manually write the DataFrame to HDFS, read it back,
coalesce it and write it back to HDFS.
Originally post at
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
Best,
----
Ben Du
Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> |
Bitbucket<https://bitbucket.org/dclong/> | Docker
Hub<https://hub.docker.com/r/dclong/>