Hi Matthias We have been thinking about this problem recently and thought, wouldn't it be nice if a join could be configured to be '1 time', within the retention period of the join window. So if a join has occurred already on a particular key, further ones will be ignored for the remainder of the retention period (or maybe the retention is reset). Assuming I have not missed a feature on the current DSL.
I suppose the implementation would not be much different from putting a transform after the join as a second state store would be required? Adrian -----Original Message----- From: Matthias J. Sax [mailto:matth...@confluent.io] Sent: 04 May 2017 23:55 To: users@kafka.apache.org Subject: Re: Deduplicating KStream-KStream join Hi, we don't believe in triggers ;) https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ -> Thus, it's a BigQuery flaw to not support updates... (IMHO) (We are also considering improving KStream-KStream join though, but that's of course no short term solution for you: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions) The simplest way to work around this within Streams would be to use a KTable -- but it might not be 100% guaranteed that all duplicates get filtered (as we flush on commit) -- you get only very high probability: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step However, building a de-duplication by yourself is not so hard basically. Just use a #transform() with a state. Put all results into the state and thus, duplicates "overwrite" older (false) results. The tricky part is only do decide, how long you want to delay your output and evict result from your state and sent it downstream. As it is stream processing, there might always be a late arriving record, and thus, if you decide that at <key, x-null> result should be evicted from your store and sent to BigQuery, you might still be wrong, and later a <key, x-y> updates might arrive in your de-duplication filter... -> this problem is the actual reason, why we don't believe in triggers -- it's a simplification to assume that you can trigger at some point and get a final result (but you can never be sure....) -- be think a cleaner solution is the ability to handle "change" (ie update) end-to-end. Thus, the "cleanest" solution would actually be to make your downstream applications aware of "late update". As you can't do anything about BigQuery, you could let your downstream application deal with it -- ie, when you query BigQuery, filter those result records with some SQL magic. Not sure how feasible this approach would be for your use case. If it does not works, you can still go with the #transform() step if you can define a good policy for how long to wait for late data. Hope this helps. -Matthias On 5/4/17 8:26 AM, Ofir Sharony wrote: > Hi guys, > > I want to perform a join between two KStreams. > An event may appear only on one of the streams (either one of them), > so I can't use inner join (which emits only on a match) or left join > (which emits only when the left input arrives). > This leaves me with outer join. The problem with outer join is that it > emits on every record arrival, which creates duplicates at the output node. > > My downstream application is BigQuery, which doesn't support updates, > thus can't do the dedup by itself. > What is the best practice implementing deduplication in KafkaStreams, > keeping only the latest, most updated record? > Is it possible to emit a record only after some time has passed, or > upon a certain trigger? > > Thanks. > > *Ofir Sharony* > BackEnd Tech Lead > > Mobile: +972-54-7560277 <+972%2054-756-0277> | > ofir.shar...@myheritage.com > | www.myheritage.com > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel > > <http://www.myheritage.com/> > > <https://www.facebook.com/myheritage> > <https://twitter.com/myheritage> <http://blog.myheritage.com/> > <https://www.youtube.com/user/MyHeritageLtd> >