Sorry for confusing with cume_dist and percent_rank. I was playing around with these to see if the difference in computation made any difference. I must have copied the percent rank accidentally. My requirement is to compute cume_dist.
I have a dataframe with a bunch of columns (10+ columns) that I would need to create a cume_dist on, based on the partition key of customer name, site name and year_month columns. The columns where I need to run cume_dist on could be null and I want to exclude those from the calculations. Ideally it would be nice to have a option in cume_dist to ignore nulls F.cume_dist(ignoreNulls=True).over(…) but no such option exists. I can create a new dataframe df2 with filtered output of dataframe df and run cume_dist and finally merge it with left join with df. Question is how do I do this efficiently for a list of columns? Writing this in a loop and joining at the end of each loop looks counter-intuitive. Is there a better way to do this? > On Jan 11, 2022, at 11:53 PM, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > > Hi, > > I am not sure what you are trying to achieve here are cume_dist and > percent_rank not different? > > If am able to follow your question correctly, you are looking for filtering > our NULLs before applying the function on the dataframe, and I think it will > be fine if you just create another dataframe first with the non null values > and then apply the function to that dataframe. > > It will be of much help if you can explain what are you trying to achieve > here. Applying loops on dataframe, like you have done in the dataframe is > surely not recommended at all, please see the explain plan of the dataframe > in each iteration to understand the effect of your loops on the explain plan > - that should give some details. > > > Regards, > Gourav Sengupta > >> On Mon, Jan 10, 2022 at 10:49 PM Ramesh Natarajan <rames...@gmail.com> wrote: >> I want to compute cume_dist on a bunch of columns in a spark dataframe, but >> want to remove NULL values before doing so. >> >> I have this loop in pyspark. While this works, I see the driver runs at 100% >> while the executors are idle for the most part. I am reading that running a >> loop is an anti-pattern and should be avoided. Any pointers on how to >> optimize this section of pyspark code? >> >> I am running this on the AWS Glue 3.0 environment. >> >> for column_name, new_col in [ >> ("event_duration", "percentile_rank_evt_duration"), >> ("event_duration_pred", "percentile_pred_evt_duration"), >> ("alarm_cnt", "percentile_rank_alarm_cnt"), >> ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), >> ("event_duration_adj", "percentile_rank_evt_duration_adj"), >> ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), >> ("encounter_time", "percentile_rank_encounter_time"), >> ("encounter_time_pred", "percentile_pred_encounter_time"), >> ("encounter_time_adj", "percentile_rank_encounter_time_adj"), >> ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), >> ]: >> win = ( >> Window().partitionBy(["p_customer_name", "p_site_name", >> "year_month"]) >> .orderBy(col(column_name)) >> ) >> df1 = df.filter(F.col(column_name).isNull()) >> df2 = df.filter(F.col(column_name).isNotNull()).withColumn( >> new_col, F.round(F.cume_dist().over(win) * >> lit(100)).cast("integer") >> ) >> df = df2.unionByName(df1, allowMissingColumns=True) >> >> For some reason this code seems to work faster, but it doesn't remove NULLs >> prior to computing the cume_dist. Not sure if this is also a proper way to >> do this :( >> >> for column_name, new_col in [ >> ("event_duration", "percentile_rank_evt_duration"), >> ("event_duration_pred", "percentile_pred_evt_duration"), >> ("alarm_cnt", "percentile_rank_alarm_cnt"), >> ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), >> ("event_duration_adj", "percentile_rank_evt_duration_adj"), >> ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), >> ("encounter_time", "percentile_rank_encounter_time"), >> ("encounter_time_pred", "percentile_pred_encounter_time"), >> ("encounter_time_adj", "percentile_rank_encounter_time_adj"), >> ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), >> ]: >> win = ( >> Window().partitionBy(["p_customer_name", "p_site_name", >> "year_month"]) >> .orderBy(col(column_name)) >> ) >> df = df.withColumn( >> new_col, >> F.when(F.col(column_name).isNull(), F.lit(None)).otherwise( >> F.round(F.percent_rank().over(win) * >> lit(100)).cast("integer") >> ), >> ) >> >> Appreciate if anyone has any pointers on how to go about this.. >> >> thanks >> Ramesh