Hi Pooja, Here's an implementation from Jamie Grier for de-duplication using in-memory cache with some expiration time: https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java
If for your use-case you can limit the time period where duplications may happen, you can use this approach. Thanks, Rafi On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal <pooja.ag...@gmail.com> wrote: > Hey, > > I am sorry for the confusion. So, the value is not already present in the > event. We are reading it from a static table (kind of data enrichment in > flink job). Above event is an enriched event. > If we say that this is some transaction event, the user would have done > the transaction once and hence the transaction_id is unique. But, the table > from where we are reading the value may contain the wrong value (not > always, sometimes because of bug). In this case, we may want to reprocess > that transaction event with new value (here, the transaction_id will be > same as previous, but the value will change). I hope this clears the > scenario. Let me know if you have any other questions. > > To solve the idempotency problem, you suggested to maintain a set > recording transaction_id(s). Since, we are aggregating over all events seen > till now, the number of events and hence ids will be too large. I am > assuming we will need to have some external store here and do a lookup > every time we process an event. This may increase the latency. Can you > suggest the efficient way to solve this? and if we need to have an external > store, what will be the best candidate? > > Thanks > Pooja > > > > On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <reed...@gmail.com> wrote: > >> Hi Pooja, >> >> I'm a bit confused since in 1) it says that "If two events have same >> transaction_id, we can say that they are duplicates", and in 2) it says >> that "Since this is just a value change, the transaction_id will be same". >> Looks to me they are conflicting. Usually in case 2) scenarios, the value >> updates event is considered as new event which does not share the unique id >> with prior events. >> >> If each event has a unique transaction_id, you can use it to de-duplicate >> the events via a set recording the transaction_id(s) which are >> already processed. And then 2) would not be a problem with the unique >> transaction_id assumption. >> >> Thanks, >> Zhu Zhu >> >> Pooja Agrawal <pooja.ag...@gmail.com> 于2019年12月17日周二 下午9:17写道: >> >>> >>> Hi, >>> >>> I have a use case where we are reading events from kinesis stream.The >>> event can look like this >>> Event { >>> event_id, >>> transaction_id >>> key1, >>> key2, >>> value, >>> timestamp, >>> some other fields... >>> } >>> >>> We want to aggregate the values per key for all events we have seen till >>> now (as simple as "select key1, key2, sum(value) from events group by key1, >>> key2key."). For this I have created a simple flink job which uses >>> flink-kinesis connector and applies keyby() and sum() on the incoming >>> events. I am facing two challenges here: >>> >>> 1) The incoming events can have duplicates. How to maintain exactly once >>> processing here, as processing duplicate events can give me erroneous >>> result? The field transaction_id will be unique for each events. If two >>> events have same transaction_id, we can say that they are duplicates (By >>> duplicates here, I don't just mean the retried ones. The same message can >>> be present in kinesis with different sequence number. I am not sure if >>> flink-kinesis connector can handle that, as it is using KCL underlying >>> which I assume doesn't take care of it) >>> >>> 2) There can be the the cases where the value has been updated for a key >>> after processing the event and we may want to reprocess those events with >>> new value. Since this is just a value change, the transaction_id will be >>> same. The idempotency logic will not allow to reprocess the events. What >>> are the ways to handle such scenarios in flink? >>> >>> Thanks >>> Pooja >>> >>> >>> -- >>> Warm Regards, >>> Pooja Agrawal >>> >> > > -- > Warm Regards, > Pooja Agrawal >