Hi Pooja, My main confusion is, if 2 events have the same transaction_id, how can we tell if it is a wanted one for value updates, or it is an unwanted duplicate?
MapState with a TTL can be used for deduplicating, if it is supposed that a duplicated event would not happen too late after the original event was processed. Thanks, Zhu Zhu Rafi Aroch <rafi.ar...@gmail.com> 于2019年12月18日周三 下午3:50写道: > Hi Pooja, > > Here's an implementation from Jamie Grier for de-duplication using > in-memory cache with some expiration time: > > https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java > > If for your use-case you can limit the time period where duplications may > happen, you can use this approach. > > Thanks, > Rafi > > > On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal <pooja.ag...@gmail.com> > wrote: > >> Hey, >> >> I am sorry for the confusion. So, the value is not already present in the >> event. We are reading it from a static table (kind of data enrichment in >> flink job). Above event is an enriched event. >> If we say that this is some transaction event, the user would have done >> the transaction once and hence the transaction_id is unique. But, the table >> from where we are reading the value may contain the wrong value (not >> always, sometimes because of bug). In this case, we may want to reprocess >> that transaction event with new value (here, the transaction_id will be >> same as previous, but the value will change). I hope this clears the >> scenario. Let me know if you have any other questions. >> >> To solve the idempotency problem, you suggested to maintain a set >> recording transaction_id(s). Since, we are aggregating over all events seen >> till now, the number of events and hence ids will be too large. I am >> assuming we will need to have some external store here and do a lookup >> every time we process an event. This may increase the latency. Can you >> suggest the efficient way to solve this? and if we need to have an external >> store, what will be the best candidate? >> >> Thanks >> Pooja >> >> >> >> On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <reed...@gmail.com> wrote: >> >>> Hi Pooja, >>> >>> I'm a bit confused since in 1) it says that "If two events have same >>> transaction_id, we can say that they are duplicates", and in 2) it says >>> that "Since this is just a value change, the transaction_id will be same". >>> Looks to me they are conflicting. Usually in case 2) scenarios, the value >>> updates event is considered as new event which does not share the unique id >>> with prior events. >>> >>> If each event has a unique transaction_id, you can use it to >>> de-duplicate the events via a set recording the transaction_id(s) which are >>> already processed. And then 2) would not be a problem with the unique >>> transaction_id assumption. >>> >>> Thanks, >>> Zhu Zhu >>> >>> Pooja Agrawal <pooja.ag...@gmail.com> 于2019年12月17日周二 下午9:17写道: >>> >>>> >>>> Hi, >>>> >>>> I have a use case where we are reading events from kinesis stream.The >>>> event can look like this >>>> Event { >>>> event_id, >>>> transaction_id >>>> key1, >>>> key2, >>>> value, >>>> timestamp, >>>> some other fields... >>>> } >>>> >>>> We want to aggregate the values per key for all events we have seen >>>> till now (as simple as "select key1, key2, sum(value) from events group by >>>> key1, key2key."). For this I have created a simple flink job which uses >>>> flink-kinesis connector and applies keyby() and sum() on the incoming >>>> events. I am facing two challenges here: >>>> >>>> 1) The incoming events can have duplicates. How to maintain exactly >>>> once processing here, as processing duplicate events can give me erroneous >>>> result? The field transaction_id will be unique for each events. If two >>>> events have same transaction_id, we can say that they are duplicates (By >>>> duplicates here, I don't just mean the retried ones. The same message can >>>> be present in kinesis with different sequence number. I am not sure if >>>> flink-kinesis connector can handle that, as it is using KCL underlying >>>> which I assume doesn't take care of it) >>>> >>>> 2) There can be the the cases where the value has been updated for a >>>> key after processing the event and we may want to reprocess those events >>>> with new value. Since this is just a value change, the transaction_id will >>>> be same. The idempotency logic will not allow to reprocess the events. What >>>> are the ways to handle such scenarios in flink? >>>> >>>> Thanks >>>> Pooja >>>> >>>> >>>> -- >>>> Warm Regards, >>>> Pooja Agrawal >>>> >>> >> >> -- >> Warm Regards, >> Pooja Agrawal >> >