So, the above code for min() worked for me fine in general, but there was
one corner case where it failed.
Which was when I have something like:
invoice_id=1, update_time=*2018-01-01 15:00:00.000*
invoice_id=1, update_time=*2018-01-01 15:00:00.000*
invoice_id=1, update_time=2018-02-03 14:00:00.000

In this example, the update_time for 2 records is the exact same. So, doing
a filter for the min() will result in 2 records for the invoice_id=1.
This is avoided in your code snippet of row_num - because 2 rows will never
have row_num = 1

But note that here - row_num=1 and row_num=2 will be randomly ordered
(because orderBy is on update_time and they have the same value of
update_time).
Hence dropDuplicates can be used there cause it can be either one of those
rows.

Overall - dropDuplicates seems like it's meant for cases where you
literally have redundant duplicated data. And not for filtering to get
first/last etc.


On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Hello Abdeali, Thank you for your response.
>
> Can you please explain me this line, And the dropDuplicates at the end
> ensures records with two values for the same 'update_time' don't cause
> issues.
>
> Sorry I didn't get quickly. :)
>
> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <abdealikoth...@gmail.com>
> wrote:
>
>> I've faced this issue too - and a colleague pointed me to the
>> documentation -
>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>> dropDuplicates docs does not say that it will guarantee that it will
>> return the "first" record (even if you sort your dataframe)
>> It would give you any record it finds and just ensure that duplicates are
>> not present.
>>
>> The only way I know of how to do this is what you did, but you can avoid
>> the sorting inside the partition with something like (in pyspark):
>>
>> from pyspark.sql import Window, functions as F
>> df = df.withColumn('wanted_time',
>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>
>> The min() is faster than doing an orderBy() and a row_number().
>> And the dropDuplicates at the end ensures records with two values for the
>> same 'update_time' don't cause issues.
>>
>>
>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Dear Spark Users,
>>>
>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>> every time I run it drops different - different rows based on same
>>> timestamp.
>>>
>>> What I tried and worked
>>>
>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>>> desc)
>>>
>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>> row_number.over(wSpec)).where($"rn" === 1)
>>> .drop("rn").drop("update_time")
>>>
>>> But this is damn slow...
>>>
>>> Can someone please throw a light.
>>>
>>> Thanks
>>>
>>>

Reply via email to