Hi Shivam, I think what you are looking for is bucket optimization. The execution engine (spark) knows how the data was shuffled before persisting it. Unfortunately this is not supported when you use vanilla parquet files. Try saving the dataframe using the saveAsTable<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html> api along with the bucektBy<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html> option. I think the new table formats also support bucketing.
You can also go through this<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f> medium article that describes a similar problem as yours. Regards, Vibhor Gupta [https://miro.medium.com/max/1200/1*q4xHBk9ksw20Vf_25OYCtA.jpeg]<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f> Bucketing in Spark. Spark job optimization using Bucketing | by Pawan Singh Negi | Clairvoyant Blog - Medium<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f> Advantages of Bucketing the Tables in Spark. Optimized tables/Datasets.; Optimized Joins when you use pre-shuffled bucketed tables/Datasets.; Enables more efficient queries when you have predicates defined on a bucketed column.; Optimized access to the table data.You will minimize the table scan for the given query when using the WHERE condition on the bucketing column. blog.clairvoyantsoft.com pyspark.sql.DataFrameWriter.bucketBy — PySpark 3.1.2 documentation - Apache Spark<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html> Parameters numBuckets int. the number of buckets to save. col str, list or tuple. a name of a column, or a list of names. cols str. additional names (optional). If col is a list it should be empty.. Notes. Applicable for file-based data sources in combination with DataFrameWriter.saveAsTable(). Examples spark.apache.org pyspark.sql.DataFrameWriter.saveAsTable — PySpark 3.1.2 documentation - Apache Spark<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html> pyspark.sql.DataFrameWriter.saveAsTable¶ DataFrameWriter.saveAsTable (name, format = None, mode = None, partitionBy = None, ** options) [source] ¶ Saves the content of the DataFrame as the specified table.. In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). ). When mode is Overwrite, the ... spark.apache.org ________________________________ From: Shivam Verma <raj.shivam...@gmail.com> Sent: Monday, December 26, 2022 8:08 PM To: Russell Jurney <russell.jur...@gmail.com> Cc: Gurunandan <gurunandan....@gmail.com>; user@spark.apache.org <user@spark.apache.org> Subject: EXT: Re: Check if shuffle is caused for repartitioned pyspark dataframes EXTERNAL: Report suspicious emails to Email Abuse. I tried sorting the repartitioned dataframes on the partition key before saving them as parquet files, however, when I read those repartitioned-sorted dataframes and join them on the partition key, the spark plan still shows `Exchange hashpartitioning` step, which I want to avoid: == Physical Plan == *(5) HashAggregate(keys=[id#15373L], functions=[sum(col_20#15393), sum(col_40#15413), max(col_60#15433)]) +- *(5) HashAggregate(keys=[id#15373L], functions=[partial_sum(col_20#15393), partial_sum(col_40#15413), partial_max(col_60#15433)]) +- *(5) Project [id#15373L, col_20#15393, col_40#15413, col_60#15433] +- *(5) SortMergeJoin [id#15373L], [id#15171L], Inner :- *(2) Sort [id#15373L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#15373L, 4) <========================= Want to avoid this : +- *(1) Project [id#15373L, col_20#15393, col_40#15413, col_60#15433] : +- *(1) Filter isnotnull(id#15373L) : +- *(1) FileScan parquet [id#15373L,col_20#15393,col_40#15413,col_60#15433] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df2..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,col_20:double,col_40:double,col_60:double> +- *(4) Sort [id#15171L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#15171L, 4) <========================= Want to avoid this +- *(3) Project [id#15171L] +- *(3) Filter isnotnull(id#15171L) +- *(3) FileScan parquet [id#15171L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df1..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> How do I ensure that this Exchange hashpartitioning step is skipped, or is the exchange hashpartitioning part of the SparkPlan but won't actually do any repartitioning and hence no overhead would be involved? On Fri, Dec 23, 2022 at 10:08 PM Russell Jurney <russell.jur...@gmail.com<mailto:russell.jur...@gmail.com>> wrote: This may not be good advice but... could you sort by the partition key to ensure the partitions match up? Thinking of olden times :) On Fri, Dec 23, 2022 at 4:42 AM Shivam Verma <raj.shivam...@gmail.com<mailto:raj.shivam...@gmail.com>> wrote: Hi Gurunandan, Thanks for the reply! I do see the exchange operator in the SQL tab, but I can see it in both the experiments: 1. Using repartitioned dataframes 2. Using initial dataframes Does that mean that the repartitioned dataframes are not actually "co-partitioned"? If that's the case, I have two more questions: 1. Why is the job with repartitioned dataframes faster (at least 3x) as compared to the job using initial dataframes? 2. How do I ensure co-partitioning for pyspark dataframes? Thanks, Shivam On Wed, Dec 14, 2022 at 5:58 PM Gurunandan <gurunandan....@gmail.com<mailto:gurunandan....@gmail.com>> wrote: Hi, One of the options for validation is to navigate `SQL TAB` in Spark UI and click on a Query of interest to view detailed information of each Query. We need to validate if the Exchange Operator is present for shuffle, like shared in the attachment. Otherwise we can print the executed plan and validate for Exchange Operator in the Physical Plan. On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma <raj.shivam...@gmail.com<mailto:raj.shivam...@gmail.com>> wrote: > > Hello folks, > > I have a use case where I save two pyspark dataframes as parquet files and > then use them later to join with each other or with other tables and perform > multiple aggregations. > > Since I know the column being used in the downstream joins and groupby, I was > hoping I could use co-partitioning for the two dataframes when saving them > and avoid shuffle later. > > I repartitioned the two dataframes (providing same number of partitions and > same column for repartitioning). > > While I'm seeing an improvement in execution time with the above approach, > how do I confirm that a shuffle is actually NOT happening (maybe through > SparkUI)? > The spark plan and shuffle read/write are the same in the two scenarios: > 1. Using repartitioned dataframes to perform join+aggregation > 2. Using base dataframes itself (without explicit repartitioning) to perform > join+aggregatio > > I have a StackOverflow post with more details regarding the same: > https://stackoverflow.com/q/74771971/14741697<https://urldefense.com/v3/__https://stackoverflow.com/q/74771971/14741697__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihC2dS3HY$> > > Thanks in advance, appreciate your help! > > Regards, > Shivam > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> -- Thanks, Russell Jurney @rjurney<https://urldefense.com/v3/__http://twitter.com/rjurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDih8ObrigI$> russell.jur...@gmail.com<mailto:russell.jur...@gmail.com> LI<https://urldefense.com/v3/__http://linkedin.com/in/russelljurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihSjr5zIM$> FB<https://urldefense.com/v3/__http://facebook.com/jurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDih0cHXO8w$> datasyndrome.com<https://urldefense.com/v3/__http://datasyndrome.com__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihLQl5iWo$> Book a time on Calendly<https://urldefense.com/v3/__https://calendly.com/rjurney_personal/30min__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihSEsFuq4$>