Thanks a lot TD, exactly what I was looking for. And I have seen most of your talks, really great stuff you guys are doing :)
On Thu, Feb 1, 2018 at 10:38 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Hello Ayan, > > From what I understand, mapGroupsWithState (probably the more general > flatMapGroupsWithState) is the best way forward (not available in python). > However, you need to figure out your desired semantics of when you want to > output the deduplicated data from the stremaing query. For example, if > there is the following sequence of events > > (id, last_update_timestamp, attribute) > 1, 12:00, A <---- do you want to output this immediately or wait for > sometime to see if there are new data? > 1, 11:59, B <---- ignored as duplicate > 1, 12:01, C <---- do you want to output this? > 1, 12:02, D > > If you want to output something every time there is a newer > last_update_timestamp, > then thats not really a strict "deduplication". Its more like aggregation > with keeping the latest. In that case, you can try using UDAFs as well. > However, with UDAFs you wont get any state cleanup. So the > flatMapGroupsWithState is the best solution as you can do whatever tracking > you want, output whenever you want, and get state cleanup using timeouts. > > FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk - > https://databricks.com/session/deep-dive-into- > stateful-stream-processing-in-structured-streaming > > > > > > > > On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <guha.a...@gmail.com> wrote: > >> Any help would be much appreciated :) >> >> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote: >> >>> Hi >>> >>> I want to write something in Structured streaming: >>> >>> 1. I have a dataset which has 3 columns: id, last_update_timestamp, >>> attribute >>> 2. I am receiving the data through Kinesis >>> >>> I want to deduplicate records based on last_updated. In batch, it looks >>> like: >>> >>> spark.sql("select * from (Select *, row_number() OVER(Partition by id >>> order by last_updated desc) rank from table1) tmp where rank =1") >>> >>> But now I would like to do it in Structured Stream. I need to maintain >>> the state of id as per the highest last_updated, across the triggers, for a >>> certain period (24 hours). >>> >>> Questions: >>> >>> 1. Should I use mapGroupsWithState or is there any other (SQL?) >>> solution? Can anyone help me to write it? >>> 2. Is mapGroupsWithState supported in Python? >>> >>> Just to ensure we cover bases, I have already tried using >>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not >>> updating the state: >>> >>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData") >>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$. >>> header.id').alias('id'), >>> get_json_object(unpackedDF.jsonData, >>> '$.header.last_updated').cast('timestamp').alias('last_updated'), >>> unpackedDF.jsonData) >>> >>> dedupDF = >>> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 >>> hours') >>> >>> >>> So it is not working. Any help is appreciated. >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha