Sorry for confusing with cume_dist and percent_rank. I was playing around with 
these to see if the difference in computation made any difference. I must have 
copied the percent rank accidentally. My requirement is to compute cume_dist. 

I have a dataframe with a bunch of columns (10+ columns) that I would need to 
create a cume_dist on, based on the partition key of customer name, site name 
and year_month columns. 

The columns where I need to run cume_dist on could be null and I want to 
exclude those from the calculations. 

Ideally it would be nice to have a option in cume_dist to ignore nulls  
F.cume_dist(ignoreNulls=True).over(…) but no such option exists.

I can create a new dataframe df2 with filtered output of dataframe df and run 
cume_dist and finally merge it with left join with df. 

Question is how do I do this efficiently for a list of columns?

Writing this in a loop and joining at the end of each loop looks 
counter-intuitive. Is there a better way to do this?


> On Jan 11, 2022, at 11:53 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> 
> Hi,
> 
> I am not sure what you are trying to achieve here are cume_dist and 
> percent_rank not different?
> 
> If am able to follow your question correctly, you are looking for filtering 
> our NULLs before applying the function on the dataframe, and I think it will 
> be fine if you just create another dataframe first with the non null values 
> and then apply the function to that dataframe.
> 
> It will be of much help if you can explain what are you trying to achieve 
> here. Applying loops on dataframe, like you have done in the dataframe is 
> surely not recommended at all, please see the explain plan of the dataframe 
> in each iteration to understand the effect of your loops on the explain plan 
> - that should give some details.
> 
> 
> Regards,
> Gourav Sengupta
> 
>> On Mon, Jan 10, 2022 at 10:49 PM Ramesh Natarajan <rames...@gmail.com> wrote:
>> I want to compute cume_dist on a bunch of columns in a spark dataframe, but 
>> want to remove NULL values before doing so. 
>> 
>> I have this loop in pyspark. While this works, I see the driver runs at 100% 
>> while the executors are idle for the most part. I am reading that running a 
>> loop is an anti-pattern and should be avoided. Any pointers on how to 
>> optimize this section of pyspark code?
>> 
>> I am running this on  the AWS Glue 3.0 environment.
>> 
>> for column_name, new_col in [
>>         ("event_duration", "percentile_rank_evt_duration"),
>>         ("event_duration_pred", "percentile_pred_evt_duration"),
>>         ("alarm_cnt", "percentile_rank_alarm_cnt"),
>>         ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
>>         ("event_duration_adj", "percentile_rank_evt_duration_adj"),
>>         ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
>>         ("encounter_time", "percentile_rank_encounter_time"),
>>         ("encounter_time_pred", "percentile_pred_encounter_time"),
>>         ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
>>         ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
>>     ]:
>>         win = (
>>             Window().partitionBy(["p_customer_name", "p_site_name", 
>> "year_month"])
>>              .orderBy(col(column_name))
>>         )
>>         df1 = df.filter(F.col(column_name).isNull())
>>         df2 = df.filter(F.col(column_name).isNotNull()).withColumn(
>>             new_col, F.round(F.cume_dist().over(win) * 
>> lit(100)).cast("integer")
>>         )
>>         df = df2.unionByName(df1, allowMissingColumns=True)
>> 
>> For some reason this code seems to work faster, but it doesn't remove NULLs 
>> prior to computing the cume_dist. Not sure if this is also a proper way to 
>> do this :(
>> 
>> for column_name, new_col in [
>>         ("event_duration", "percentile_rank_evt_duration"),
>>         ("event_duration_pred", "percentile_pred_evt_duration"),
>>         ("alarm_cnt", "percentile_rank_alarm_cnt"),
>>         ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
>>         ("event_duration_adj", "percentile_rank_evt_duration_adj"),
>>         ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
>>         ("encounter_time", "percentile_rank_encounter_time"),
>>         ("encounter_time_pred", "percentile_pred_encounter_time"),
>>         ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
>>         ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
>>     ]:
>>         win = (
>>             Window().partitionBy(["p_customer_name", "p_site_name", 
>> "year_month"])
>>             .orderBy(col(column_name))
>>         )
>>         df = df.withColumn(
>>             new_col,
>>             F.when(F.col(column_name).isNull(), F.lit(None)).otherwise(
>>                 F.round(F.percent_rank().over(win) * 
>> lit(100)).cast("integer")
>>             ),
>>         )
>> 
>> Appreciate if anyone has any pointers on how to go about this..
>> 
>> thanks
>> Ramesh

Reply via email to