So, the above code for min() worked for me fine in general, but there was one corner case where it failed. Which was when I have something like: invoice_id=1, update_time=*2018-01-01 15:00:00.000* invoice_id=1, update_time=*2018-01-01 15:00:00.000* invoice_id=1, update_time=2018-02-03 14:00:00.000
In this example, the update_time for 2 records is the exact same. So, doing a filter for the min() will result in 2 records for the invoice_id=1. This is avoided in your code snippet of row_num - because 2 rows will never have row_num = 1 But note that here - row_num=1 and row_num=2 will be randomly ordered (because orderBy is on update_time and they have the same value of update_time). Hence dropDuplicates can be used there cause it can be either one of those rows. Overall - dropDuplicates seems like it's meant for cases where you literally have redundant duplicated data. And not for filtering to get first/last etc. On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <chetan.opensou...@gmail.com> wrote: > Hello Abdeali, Thank you for your response. > > Can you please explain me this line, And the dropDuplicates at the end > ensures records with two values for the same 'update_time' don't cause > issues. > > Sorry I didn't get quickly. :) > > On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <abdealikoth...@gmail.com> > wrote: > >> I've faced this issue too - and a colleague pointed me to the >> documentation - >> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates >> dropDuplicates docs does not say that it will guarantee that it will >> return the "first" record (even if you sort your dataframe) >> It would give you any record it finds and just ensure that duplicates are >> not present. >> >> The only way I know of how to do this is what you did, but you can avoid >> the sorting inside the partition with something like (in pyspark): >> >> from pyspark.sql import Window, functions as F >> df = df.withColumn('wanted_time', >> F.min('update_time').over(Window.partitionBy('invoice_id'))) >> out_df = df.filter(df['update_time'] == df['wanted_time']) >> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time') >> >> The min() is faster than doing an orderBy() and a row_number(). >> And the dropDuplicates at the end ensures records with two values for the >> same 'update_time' don't cause issues. >> >> >> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri < >> chetan.opensou...@gmail.com> wrote: >> >>> Hello Dear Spark Users, >>> >>> I am using dropDuplicate on a DataFrame generated from large parquet >>> file from(HDFS) and doing dropDuplicate based on timestamp based column, >>> every time I run it drops different - different rows based on same >>> timestamp. >>> >>> What I tried and worked >>> >>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time". >>> desc) >>> >>> val irqDistinctDF = irqFilteredDF.withColumn("rn", >>> row_number.over(wSpec)).where($"rn" === 1) >>> .drop("rn").drop("update_time") >>> >>> But this is damn slow... >>> >>> Can someone please throw a light. >>> >>> Thanks >>> >>>