Hi Sid, This article is concise and pretty up-to-date.
Spark’s Logical and Physical plans … When, Why, How and Beyond. <https://medium.com/datalex/sparks-logical-and-physical-plans-when-why-how-and-beyond-8cd1947b605a> It is a good start. If after reading it, some stuff needs to be explained, refer back here. On Spark, the optimizer is named “Catalyst” and can be represented by the schema below. It will produce different types of plans: Operation names are: - Analysis - Logical Optimization - Physical Planning - Cost Model Analysis - Code Generation view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 20 Feb 2022 at 14:49, Sid <flinkbyhe...@gmail.com> wrote: > Thank you so much for your reply, Mich. I will go through it. However, I > want to understand how to read this plan? If I face any errors or if I want > to look how spark is cost optimizing or how should we approach it? > > Could you help me in layman terms? > > Thanks, > Sid > > On Sun, 20 Feb 2022, 17:50 Mich Talebzadeh, <mich.talebza...@gmail.com> > wrote: > >> Do a Google search on *sort-merge spark*. There are plenty of notes on >> the topic and examples. NLJ, Sort-merge and Hash-joins and derivatives are >> common join algorithms in database systems. These were not created by >> Spark. At a given time, there are reasons why one specific join is >> preferred over the other. Over time the underlying data volume may change >> and that could result in the optimizer opting for another type of join. >> >> >> For some relatively recent discussion on Spark Join Strategies have a >> look at here >> <https://blog.clairvoyantsoft.com/apache-spark-join-strategies-e4ebc7624b06> >> >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 19 Feb 2022 at 10:00, Sid Kal <flinkbyhe...@gmail.com> wrote: >> >>> I wrote a query like below and I am trying to understand its query >>> execution plan. >>> >>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join >>> df1 b on a.CustomerID=b.CustomerID").explain(mode="extended") >>> == Parsed Logical Plan == >>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state] >>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID) >>> :- 'SubqueryAlias a >>> : +- 'UnresolvedRelation [df], [], false >>> +- 'SubqueryAlias b >>> +- 'UnresolvedRelation [df1], [], false >>> >>> == Analyzed Logical Plan == >>> CustomerID: int, CustomerName: string, state: string >>> Project [CustomerID#640, CustomerName#641, state#988] >>> +- Join Inner, (CustomerID#640 = CustomerID#978) >>> :- SubqueryAlias a >>> : +- SubqueryAlias df >>> : +- >>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] >>> csv >>> +- SubqueryAlias b >>> +- SubqueryAlias df1 >>> +- >>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] >>> csv >>> >>> == Optimized Logical Plan == >>> Project [CustomerID#640, CustomerName#641, state#988] >>> +- Join Inner, (CustomerID#640 = CustomerID#978) >>> :- Project [CustomerID#640, CustomerName#641] >>> : +- Filter isnotnull(CustomerID#640) >>> : +- >>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] >>> csv >>> +- Project [CustomerID#978, State#988] >>> +- Filter isnotnull(CustomerID#978) >>> +- >>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] >>> csv >>> >>> == Physical Plan == >>> *(5) Project [CustomerID#640, CustomerName#641, state#988] >>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner >>> :- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(CustomerID#640, 200), >>> ENSURE_REQUIREMENTS, [id=#451] >>> : +- *(1) Filter isnotnull(CustomerID#640) >>> : +- FileScan csv [CustomerID#640,CustomerName#641] Batched: >>> false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location: >>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], >>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: >>> struct<CustomerID:int,CustomerName:string> >>> +- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0 >>> +- Exchange hashpartitioning(CustomerID#978, 200), >>> ENSURE_REQUIREMENTS, [id=#459] >>> +- *(3) Filter isnotnull(CustomerID#978) >>> +- FileScan csv [CustomerID#978,State#988] Batched: false, >>> DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location: >>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], >>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: >>> struct<CustomerID:int,State:string> >>> >>> I know some of the features like Project is like select clause, filters >>> is whatever filters we use in the query. Where can I look for the cost >>> optimization in this plan? Suppose in future if my query is taking a longer >>> time to be executed then by looking at this plan how can I figure what >>> exactly is happening and what needs to be modified on the query part? Also >>> internally since spark by default uses sort merge join as I can see from >>> the plan but when does it opts for Sort-Merge Join and when does it opts >>> for Shuffle-Hash Join? >>> >>> Thanks, >>> Sid >>> >>>