Hi, Can you please try to use SPARK SQL, instead of dataframes and see the difference?
You will get a lot of theoretical arguments, and that is fine, but they are just largely and essentially theories. Also try to apply the function to the result of the filters as a sub-query by caching in the data of the filters first. Regards, Gourav Sengupta On Mon, Jan 31, 2022 at 8:00 AM Benjamin Du <legendu....@outlook.com> wrote: > I don't think coalesce (by repartitioning I assume you mean coalesce) > itself and deserialising takes that much time. To add a little bit more > context, the computation of the DataFrame is CPU intensive instead of > data/IO intensive. I purposely keep coalesce after df.count as I want > to keep the large number of partitions (30k) when computing the DataFrame > so that I can get a much higher parallelism. After the computation, I > reduce the number of partitions (to avoid having too many small files on > HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k > partitions is used) and write it to disk (without doing repartitioning or > coalesce). If I manually write the computed DataFrame to disk, read it > back, coalesce it and then write it back to disk, it also takes about 5 > hours. The code that I pasted in this thread takes forever to run as the > DataFrame is obviously recomputed at df.coalesce and with a parallelism > of 300 partitions, it is almost impossible to compute the DataFrame in a > reasonable amount of time. > > I tried various ways but none of them worked except manually write to > disk, read it back, repartition/coalesce it, and then write it back to > HDFS. > > 1. checkpoint by itself computer the DataFrame twice. (This is a known > existing bug of checkpoint). > > output_mod = f"{output}/job={mod}" > spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .checkpoint() \ > .coalesce(300) \ > .write.mode("overwrite").parquet(output_mod) > > > 1. persist (to Disk) + count computer the DataFrame twice. > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .persist(StorageLevel.DISK_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > 1. persist to memory + count computes the DataFrame twice > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .persist(StorageLevel.MEMORY_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > 1. persist (to memory) + checkpoint + coalesce computes the DataFrame > twice > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .persist(StorageLevel.MEMORY_ONLY) \ > .checkpoint() \ > .coalesce(300).write.mode("overwrite").parquet(output_mod) > > > 1. persist (to memory) + checkpoint + without coalesce computes the > DataFrame twice > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .persist(StorageLevel.MEMORY_ONLY) \ > .checkpoint() \ > .write.mode("overwrite").parquet(output_mod) > > > 1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce > computes it twice > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .cache() > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > A > > Manual output compute it only once. The function repart_hdfs below is a > function written by myself to write a DataFrame to disk, read it back, > repartition/coalesce it, and then write it back to HDFS. > > spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .write.mode("overwrite").parquet(output_mod) > repart_hdfs(spark, output_mod, 300, coalesce=True) > > > > > > > > > > > > Best, > > ---- > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > ------------------------------ > *From:* Sebastian Piu <sebastian....@gmail.com> > *Sent:* Sunday, January 30, 2022 12:44 AM > *To:* Benjamin Du <legendu....@outlook.com> > *Cc:* u...@spark.incubator.apache.org <u...@spark.incubator.apache.org> > *Subject:* Re: A Persisted Spark DataFrame is computed twice > > It's probably the repartitioning and deserialising the df that you are > seeing take time. Try doing this > > 1. Add another count after your current one and compare times > 2. Move coalesce before persist > > > > You should see > > On Sun, 30 Jan 2022, 08:37 Benjamin Du, <legendu....@outlook.com> wrote: > > I have some PySpark code like below. Basically, I persist a DataFrame > (which is time-consuming to compute) to disk, call the method > DataFrame.count to trigger the caching/persist immediately, and then I > coalesce the DataFrame to reduce the number of partitions (the original > DataFrame has 30,000 partitions) and output it to HDFS. Based on the > execution time of job stages and the execution plan, it seems to me that > the DataFrame is recomputed at df.coalesce(300). Does anyone know why > this happens? > > df = spark.read.parquet("/input/hdfs/path") \ > .filter(...) \ > .withColumn("new_col", my_pandas_udf("col0", "col1")) \ > .persist(StorageLevel.DISK_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > BTW, it works well if I manually write the DataFrame to HDFS, read it > back, coalesce it and write it back to HDFS. > Originally post at > https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice. > <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> > > Best, > > ---- > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > >