Hi,

I think that the best option is to use the SPARK UI. In SPARK 3.x the UI
and its additional settings are fantastic. Try to also see the settings for
Adaptive Query Execution in SPARK, under certain conditions it really works
wonders.

For certain long queries, the way you are finally triggering the action of
query execution, and whether you are using SPARK Dataframes or SPARK SQL,
and the settings in SPARK (look at the settings for SPARK 3.x) and a few
other aspects you will see that the plan is quite cryptic and difficult to
read sometimes.

Regards,
Gourav Sengupta

On Sun, Feb 20, 2022 at 7:32 PM Sid Kal <flinkbyhe...@gmail.com> wrote:

> Hi Gourav,
>
> Right now I am just trying to understand the query execution plan by
> executing a simple join example via Spark SQL. The overall goal is to
> understand these plans so that going forward if my query runs slow due to
> data skewness or some other issues, I should be able to atleast understand
> what exactly is happening at the master and slave sides like map reduce.
>
> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi,
>>
>> what are you trying to achieve by this?
>>
>> If there is a performance deterioration, try to collect the query
>> execution run time statistics from SPARK SQL. They can be seen from the
>> SPARK SQL UI and available over API's in case I am not wrong.
>>
>> Please ensure that you are not trying to over automate things.
>>
>> Reading how to understand the plans may be good depending on what you are
>> trying to do.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal <flinkbyhe...@gmail.com> wrote:
>>
>>> I wrote a query like below and I am trying to understand its query
>>> execution plan.
>>>
>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join
>>> df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
>>> == Parsed Logical Plan ==
>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>>>    :- 'SubqueryAlias a
>>>    :  +- 'UnresolvedRelation [df], [], false
>>>    +- 'SubqueryAlias b
>>>       +- 'UnresolvedRelation [df1], [], false
>>>
>>> == Analyzed Logical Plan ==
>>> CustomerID: int, CustomerName: string, state: string
>>> Project [CustomerID#640, CustomerName#641, state#988]
>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>    :- SubqueryAlias a
>>>    :  +- SubqueryAlias df
>>>    :     +-
>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>> csv
>>>    +- SubqueryAlias b
>>>       +- SubqueryAlias df1
>>>          +-
>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>> csv
>>>
>>> == Optimized Logical Plan ==
>>> Project [CustomerID#640, CustomerName#641, state#988]
>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>    :- Project [CustomerID#640, CustomerName#641]
>>>    :  +- Filter isnotnull(CustomerID#640)
>>>    :     +-
>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>> csv
>>>    +- Project [CustomerID#978, State#988]
>>>       +- Filter isnotnull(CustomerID#978)
>>>          +-
>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>> csv
>>>
>>> == Physical Plan ==
>>> *(5) Project [CustomerID#640, CustomerName#641, state#988]
>>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner
>>>    :- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(CustomerID#640, 200),
>>> ENSURE_REQUIREMENTS, [id=#451]
>>>    :     +- *(1) Filter isnotnull(CustomerID#640)
>>>    :        +- FileScan csv [CustomerID#640,CustomerName#641] Batched:
>>> false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location:
>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
>>> struct<CustomerID:int,CustomerName:string>
>>>    +- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0
>>>       +- Exchange hashpartitioning(CustomerID#978, 200),
>>> ENSURE_REQUIREMENTS, [id=#459]
>>>          +- *(3) Filter isnotnull(CustomerID#978)
>>>             +- FileScan csv [CustomerID#978,State#988] Batched: false,
>>> DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location:
>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
>>> struct<CustomerID:int,State:string>
>>>
>>> I know some of the features like Project is like select clause, filters
>>> is whatever filters we use in the query. Where can I look for the cost
>>> optimization in this plan? Suppose in future if my query is taking a longer
>>> time to be executed then by looking at this plan how can I figure what
>>> exactly is happening and what needs to be modified on the query part? Also
>>> internally since spark by default uses sort merge join as I can see from
>>> the plan but when does it opts for Sort-Merge Join and when does it opts
>>> for Shuffle-Hash Join?
>>>
>>> Thanks,
>>> Sid
>>>
>>>

Reply via email to