Hi Mich, Thanks for the link. I will go through it. I have two doubts regarding sort-merge join.
1) I came across one article where it mentioned that it is a better join technique since it doesn't have to scan the entire tables since the keys are sorted. If I have keys like 1,2,4,10 and other lists as 1,2,3,4,5,6,7,8,9,10. In this case I will get data for keys 1,2,4,10 as the output if I talk about the inner join. So, how does it work exactly in my case? Assume these datasets as a huge dataset. 2) If I don't have sortable keys but still I have a huge dataset and need to join then in this case what can I do? Suppose I have a "Department" column and need to join with the other table based on "Department". So, can I sort the string as well? What does it exactly mean by non-sortable keys? Thanks, Sid On Wed, Feb 23, 2022 at 11:46 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Sid, > > For now, with regard to point 2 > > 2) Predicate push down under the optimized logical plan. Could you please > help me to understand the predicate pushdown with some other simple example? > > > Please see this good explanation with examples > > > Using Spark predicate push down in Spark SQL queries > <https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html> > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 23 Feb 2022 at 17:57, Sid <flinkbyhe...@gmail.com> wrote: > >> Hi, >> >> Can you help me with my doubts? Any links would also be helpful. >> >> Thanks, >> Sid >> >> On Wed, Feb 23, 2022 at 1:22 AM Sid Kal <flinkbyhe...@gmail.com> wrote: >> >>> Hi Mich / Gourav, >>> >>> Thanks for your time :) Much appreciated. I went through the article >>> shared by Mich about the query execution plan. I pretty much understood >>> most of the things till now except the two things below. >>> 1) HashAggregate in the plan? Does this always indicate "group by" >>> columns? >>> 2) Predicate push down under the optimized logical plan. Could you >>> please help me to understand the predicate pushdown with some other simple >>> example? >>> >>> >>> On Mon, Feb 21, 2022 at 1:52 PM Gourav Sengupta < >>> gourav.sengu...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I think that the best option is to use the SPARK UI. In SPARK 3.x the >>>> UI and its additional settings are fantastic. Try to also see the settings >>>> for Adaptive Query Execution in SPARK, under certain conditions it really >>>> works wonders. >>>> >>>> For certain long queries, the way you are finally triggering the action >>>> of query execution, and whether you are using SPARK Dataframes or SPARK >>>> SQL, and the settings in SPARK (look at the settings for SPARK 3.x) and a >>>> few other aspects you will see that the plan is quite cryptic and difficult >>>> to read sometimes. >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> On Sun, Feb 20, 2022 at 7:32 PM Sid Kal <flinkbyhe...@gmail.com> wrote: >>>> >>>>> Hi Gourav, >>>>> >>>>> Right now I am just trying to understand the query execution plan by >>>>> executing a simple join example via Spark SQL. The overall goal is to >>>>> understand these plans so that going forward if my query runs slow due to >>>>> data skewness or some other issues, I should be able to atleast understand >>>>> what exactly is happening at the master and slave sides like map reduce. >>>>> >>>>> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta < >>>>> gourav.sengu...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> what are you trying to achieve by this? >>>>>> >>>>>> If there is a performance deterioration, try to collect the query >>>>>> execution run time statistics from SPARK SQL. They can be seen from the >>>>>> SPARK SQL UI and available over API's in case I am not wrong. >>>>>> >>>>>> Please ensure that you are not trying to over automate things. >>>>>> >>>>>> Reading how to understand the plans may be good depending on what you >>>>>> are trying to do. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Gourav Sengupta >>>>>> >>>>>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal <flinkbyhe...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I wrote a query like below and I am trying to understand its query >>>>>>> execution plan. >>>>>>> >>>>>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a >>>>>>> join df1 b on a.CustomerID=b.CustomerID").explain(mode="extended") >>>>>>> == Parsed Logical Plan == >>>>>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state] >>>>>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID) >>>>>>> :- 'SubqueryAlias a >>>>>>> : +- 'UnresolvedRelation [df], [], false >>>>>>> +- 'SubqueryAlias b >>>>>>> +- 'UnresolvedRelation [df1], [], false >>>>>>> >>>>>>> == Analyzed Logical Plan == >>>>>>> CustomerID: int, CustomerName: string, state: string >>>>>>> Project [CustomerID#640, CustomerName#641, state#988] >>>>>>> +- Join Inner, (CustomerID#640 = CustomerID#978) >>>>>>> :- SubqueryAlias a >>>>>>> : +- SubqueryAlias df >>>>>>> : +- >>>>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] >>>>>>> csv >>>>>>> +- SubqueryAlias b >>>>>>> +- SubqueryAlias df1 >>>>>>> +- >>>>>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] >>>>>>> csv >>>>>>> >>>>>>> == Optimized Logical Plan == >>>>>>> Project [CustomerID#640, CustomerName#641, state#988] >>>>>>> +- Join Inner, (CustomerID#640 = CustomerID#978) >>>>>>> :- Project [CustomerID#640, CustomerName#641] >>>>>>> : +- Filter isnotnull(CustomerID#640) >>>>>>> : +- >>>>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] >>>>>>> csv >>>>>>> +- Project [CustomerID#978, State#988] >>>>>>> +- Filter isnotnull(CustomerID#978) >>>>>>> +- >>>>>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] >>>>>>> csv >>>>>>> >>>>>>> == Physical Plan == >>>>>>> *(5) Project [CustomerID#640, CustomerName#641, state#988] >>>>>>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner >>>>>>> :- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0 >>>>>>> : +- Exchange hashpartitioning(CustomerID#640, 200), >>>>>>> ENSURE_REQUIREMENTS, [id=#451] >>>>>>> : +- *(1) Filter isnotnull(CustomerID#640) >>>>>>> : +- FileScan csv [CustomerID#640,CustomerName#641] >>>>>>> Batched: false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, >>>>>>> Location: >>>>>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], >>>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], >>>>>>> ReadSchema: >>>>>>> struct<CustomerID:int,CustomerName:string> >>>>>>> +- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0 >>>>>>> +- Exchange hashpartitioning(CustomerID#978, 200), >>>>>>> ENSURE_REQUIREMENTS, [id=#459] >>>>>>> +- *(3) Filter isnotnull(CustomerID#978) >>>>>>> +- FileScan csv [CustomerID#978,State#988] Batched: >>>>>>> false, DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location: >>>>>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], >>>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], >>>>>>> ReadSchema: >>>>>>> struct<CustomerID:int,State:string> >>>>>>> >>>>>>> I know some of the features like Project is like select clause, >>>>>>> filters is whatever filters we use in the query. Where can I look for >>>>>>> the >>>>>>> cost optimization in this plan? Suppose in future if my query is taking >>>>>>> a >>>>>>> longer time to be executed then by looking at this plan how can I figure >>>>>>> what exactly is happening and what needs to be modified on the query >>>>>>> part? >>>>>>> Also internally since spark by default uses sort merge join as I can see >>>>>>> from the plan but when does it opts for Sort-Merge Join and when does it >>>>>>> opts for Shuffle-Hash Join? >>>>>>> >>>>>>> Thanks, >>>>>>> Sid >>>>>>> >>>>>>>