I don't think coalesce (by repartitioning I assume you mean coalesce) itself
and deserialising takes that much time. To add a little bit more context, the
computation of the DataFrame is CPU intensive instead of data/IO intensive. I
purposely keep coalesce after df.count as I want to keep the large number of
partitions (30k) when computing the DataFrame so that I can get a much higher
parallelism. After the computation, I reduce the number of partitions (to avoid
having too many small files on HDFS). It typically takes about 5 hours to
compute the DataFrame (when 30k partitions is used) and write it to disk
(without doing repartitioning or coalesce). If I manually write the computed
DataFrame to disk, read it back, coalesce it and then write it back to disk, it
also takes about 5 hours. The code that I pasted in this thread takes forever
to run as the DataFrame is obviously recomputed at df.coalesce and with a
parallelism of 300 partitions, it is almost impossible to compute the DataFrame
in a reasonable amount of time.
I tried various ways but none of them worked except manually write to disk,
read it back, repartition/coalesce it, and then write it back to HDFS.
1. checkpoint by itself computer the DataFrame twice. (This is a known
existing bug of checkpoint).
output_mod = f"{output}/job={mod}"
spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.checkpoint() \
.coalesce(300) \
.write.mode("overwrite").parquet(output_mod)
1. persist (to Disk) + count computer the DataFrame twice.
output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)
1. persist to memory + count computes the DataFrame twice
output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.MEMORY_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)
1. persist (to memory) + checkpoint + coalesce computes the DataFrame twice
output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.MEMORY_ONLY) \
.checkpoint() \
.coalesce(300).write.mode("overwrite").parquet(output_mod)
1. persist (to memory) + checkpoint + without coalesce computes the
DataFrame twice
output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.MEMORY_ONLY) \
.checkpoint() \
.write.mode("overwrite").parquet(output_mod)
1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce
computes it twice
output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.cache()
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)
A
Manual output compute it only once. The function repart_hdfs below is a
function written by myself to write a DataFrame to disk, read it back,
repartition/coalesce it, and then write it back to HDFS.
spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.write.mode("overwrite").parquet(output_mod)
repart_hdfs(spark, output_mod, 300, coalesce=True)
Best,
----
Ben Du
Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> |
Bitbucket<https://bitbucket.org/dclong/> | Docker
Hub<https://hub.docker.com/r/dclong/>
________________________________
From: Sebastian Piu <[email protected]>
Sent: Sunday, January 30, 2022 12:44 AM
To: Benjamin Du <[email protected]>
Cc: [email protected] <[email protected]>
Subject: Re: A Persisted Spark DataFrame is computed twice
It's probably the repartitioning and deserialising the df that you are seeing
take time. Try doing this
1. Add another count after your current one and compare times
2. Move coalesce before persist
You should see
On Sun, 30 Jan 2022, 08:37 Benjamin Du,
<[email protected]<mailto:[email protected]>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is
time-consuming to compute) to disk, call the method DataFrame.count to trigger
the caching/persist immediately, and then I coalesce the DataFrame to reduce
the number of partitions (the original DataFrame has 30,000 partitions) and
output it to HDFS. Based on the execution time of job stages and the execution
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does
anyone know why this happens?
df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)
BTW, it works well if I manually write the DataFrame to HDFS, read it back,
coalesce it and write it back to HDFS.
Originally post at
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
Best,
----
Ben Du
Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> |
Bitbucket<https://bitbucket.org/dclong/> | Docker
Hub<https://hub.docker.com/r/dclong/>