Hi Shivam,

I think what you are looking for is bucket optimization. The execution engine 
(spark) knows how the data was shuffled before persisting it.
Unfortunately this is not supported when you use vanilla parquet files.
Try saving the dataframe using the 
saveAsTable<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html>
 api along with the 
bucektBy<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html>
 option.
I think the new table formats also support bucketing.

You can also go through 
this<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f> medium 
article that describes a similar problem as yours.

Regards,
Vibhor Gupta
[https://miro.medium.com/max/1200/1*q4xHBk9ksw20Vf_25OYCtA.jpeg]<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f>
Bucketing in Spark. Spark job optimization using Bucketing | by Pawan Singh 
Negi | Clairvoyant Blog - 
Medium<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f>
Advantages of Bucketing the Tables in Spark. Optimized tables/Datasets.; 
Optimized Joins when you use pre-shuffled bucketed tables/Datasets.; Enables 
more efficient queries when you have predicates defined on a bucketed column.; 
Optimized access to the table data.You will minimize the table scan for the 
given query when using the WHERE condition on the bucketing column.
blog.clairvoyantsoft.com

pyspark.sql.DataFrameWriter.bucketBy — PySpark 3.1.2 documentation - Apache 
Spark<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html>
Parameters numBuckets int. the number of buckets to save. col str, list or 
tuple. a name of a column, or a list of names. cols str. additional names 
(optional). If col is a list it should be empty.. Notes. Applicable for 
file-based data sources in combination with DataFrameWriter.saveAsTable(). 
Examples
spark.apache.org





pyspark.sql.DataFrameWriter.saveAsTable — PySpark 3.1.2 documentation - Apache 
Spark<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html>
pyspark.sql.DataFrameWriter.saveAsTable¶ DataFrameWriter.saveAsTable (name, 
format = None, mode = None, partitionBy = None, ** options) [source] ¶ Saves 
the content of the DataFrame as the specified table.. In the case the table 
already exists, behavior of this function depends on the save mode, specified 
by the mode function (default to throwing an exception). ). When mode is 
Overwrite, the ...
spark.apache.org





________________________________
From: Shivam Verma <raj.shivam...@gmail.com>
Sent: Monday, December 26, 2022 8:08 PM
To: Russell Jurney <russell.jur...@gmail.com>
Cc: Gurunandan <gurunandan....@gmail.com>; user@spark.apache.org 
<user@spark.apache.org>
Subject: EXT: Re: Check if shuffle is caused for repartitioned pyspark 
dataframes

EXTERNAL: Report suspicious emails to Email Abuse.

I tried sorting the repartitioned dataframes on the partition key before saving 
them as parquet files, however, when I read those repartitioned-sorted 
dataframes
and join them on the partition key, the spark plan still shows `Exchange 
hashpartitioning` step, which I want to avoid:

== Physical Plan ==
*(5) HashAggregate(keys=[id#15373L], functions=[sum(col_20#15393), 
sum(col_40#15413), max(col_60#15433)])
+- *(5) HashAggregate(keys=[id#15373L], functions=[partial_sum(col_20#15393), 
partial_sum(col_40#15413), partial_max(col_60#15433)])
   +- *(5) Project [id#15373L, col_20#15393, col_40#15413, col_60#15433]
      +- *(5) SortMergeJoin [id#15373L], [id#15171L], Inner
         :- *(2) Sort [id#15373L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#15373L, 4) 
<========================= Want to avoid this
         :     +- *(1) Project [id#15373L, col_20#15393, col_40#15413, 
col_60#15433]
         :        +- *(1) Filter isnotnull(id#15373L)
         :           +- *(1) FileScan parquet 
[id#15373L,col_20#15393,col_40#15413,col_60#15433] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df2..., 
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint,col_20:double,col_40:double,col_60:double>
         +- *(4) Sort [id#15171L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#15171L, 4) 
<========================= Want to avoid this
               +- *(3) Project [id#15171L]
                  +- *(3) Filter isnotnull(id#15171L)
                     +- *(3) FileScan parquet [id#15171L] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df1..., 
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>

How do I ensure that this Exchange hashpartitioning step is skipped, or is the 
exchange hashpartitioning part of the SparkPlan but won't actually do any 
repartitioning and hence no overhead would be involved?

On Fri, Dec 23, 2022 at 10:08 PM Russell Jurney 
<russell.jur...@gmail.com<mailto:russell.jur...@gmail.com>> wrote:
This may not be good advice but... could you sort by the partition key to 
ensure the partitions match up? Thinking of olden times :)

On Fri, Dec 23, 2022 at 4:42 AM Shivam Verma 
<raj.shivam...@gmail.com<mailto:raj.shivam...@gmail.com>> wrote:
Hi Gurunandan,

Thanks for the reply!

I do see the exchange operator in the SQL tab, but I can see it in both the 
experiments:
1. Using repartitioned dataframes
2. Using initial dataframes

Does that mean that the repartitioned dataframes are not actually 
"co-partitioned"?
If that's the case, I have two more questions:

1. Why is the job with repartitioned dataframes faster (at least 3x) as 
compared to the job using initial dataframes?
2. How do I ensure co-partitioning for pyspark dataframes?

Thanks,
Shivam



On Wed, Dec 14, 2022 at 5:58 PM Gurunandan 
<gurunandan....@gmail.com<mailto:gurunandan....@gmail.com>> wrote:
Hi,
One of the options for validation is to navigate `SQL TAB` in Spark UI
and click on a Query of interest to view detailed information of each
Query. We need to validate if the Exchange Operator is present for
shuffle, like shared in the attachment.

Otherwise we can print the executed plan and validate for Exchange
Operator in the Physical Plan.

On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma 
<raj.shivam...@gmail.com<mailto:raj.shivam...@gmail.com>> wrote:
>
> Hello folks,
>
> I have a use case where I save two pyspark dataframes as parquet files and 
> then use them later to join with each other or with other tables and perform 
> multiple aggregations.
>
> Since I know the column being used in the downstream joins and groupby, I was 
> hoping I could use co-partitioning for the two dataframes when saving them 
> and avoid shuffle later.
>
> I repartitioned the two dataframes (providing same number of partitions and 
> same column for repartitioning).
>
> While I'm seeing an improvement in execution time with the above approach, 
> how do I confirm that a shuffle is actually NOT happening (maybe through 
> SparkUI)?
> The spark plan and shuffle read/write are the same in the two scenarios:
> 1. Using repartitioned dataframes to perform join+aggregation
> 2. Using base dataframes itself (without explicit repartitioning) to perform 
> join+aggregatio
>
> I have a StackOverflow post with more details regarding the same:
> https://stackoverflow.com/q/74771971/14741697<https://urldefense.com/v3/__https://stackoverflow.com/q/74771971/14741697__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihC2dS3HY$>
>
> Thanks in advance, appreciate your help!
>
> Regards,
> Shivam
>

---------------------------------------------------------------------
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
--

Thanks,
Russell Jurney 
@rjurney<https://urldefense.com/v3/__http://twitter.com/rjurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDih8ObrigI$>
 russell.jur...@gmail.com<mailto:russell.jur...@gmail.com> 
LI<https://urldefense.com/v3/__http://linkedin.com/in/russelljurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihSjr5zIM$>
 
FB<https://urldefense.com/v3/__http://facebook.com/jurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDih0cHXO8w$>
 
datasyndrome.com<https://urldefense.com/v3/__http://datasyndrome.com__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihLQl5iWo$>
 Book a time on 
Calendly<https://urldefense.com/v3/__https://calendly.com/rjurney_personal/30min__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihSEsFuq4$>

Reply via email to