Thanks a lot TD, exactly what I was looking for. And I have seen most of
your talks, really great stuff you guys are doing :)

On Thu, Feb 1, 2018 at 10:38 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Hello Ayan,
>
> From what I understand, mapGroupsWithState (probably the more general
> flatMapGroupsWithState) is the best way forward (not available in python).
> However, you need to figure out your desired semantics of when you want to
> output the deduplicated data from the stremaing query. For example, if
> there is the following sequence of events
>
> (id, last_update_timestamp, attribute)
> 1, 12:00, A      <---- do you want to output this immediately or wait for
> sometime to see if there are new data?
> 1, 11:59, B      <---- ignored as duplicate
> 1, 12:01, C     <---- do you want to output this?
> 1, 12:02, D
>
> If you want to output something every time there is a newer 
> last_update_timestamp,
> then thats not really a strict "deduplication". Its more like aggregation
> with keeping the latest. In that case, you can try using UDAFs as well.
> However, with UDAFs you wont get any state cleanup. So the
> flatMapGroupsWithState is the best solution as you can do whatever tracking
> you want, output whenever you want, and get state cleanup using timeouts.
>
> FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
> https://databricks.com/session/deep-dive-into-
> stateful-stream-processing-in-structured-streaming
>
>
>
>
>
>
>
> On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Any help would be much appreciated :)
>>
>> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I want to write something in Structured streaming:
>>>
>>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>>> attribute
>>> 2. I am receiving the data through Kinesis
>>>
>>> I want to deduplicate records based on last_updated. In batch, it looks
>>> like:
>>>
>>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>>
>>> But now I would like to do it in Structured Stream. I need to maintain
>>> the state of id as per the highest last_updated, across the triggers, for a
>>> certain period (24 hours).
>>>
>>> Questions:
>>>
>>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>>> solution? Can anyone help me to write it?
>>> 2. Is mapGroupsWithState supported in Python?
>>>
>>>  Just to ensure we cover bases, I have already tried using
>>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>>> updating the state:
>>>
>>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>>> header.id').alias('id'),
>>>                           get_json_object(unpackedDF.jsonData,
>>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>>                           unpackedDF.jsonData)
>>>
>>> dedupDF = 
>>> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>>> hours')
>>>
>>>
>>> So it is not working. Any help is appreciated.
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to