Not sure if I can follow (and what type of join you have in mind). I assume we are still talking about windowed KStream-KStream joins.
For this scenario, and an inner join, the computation is correct as implements. Only left/outer join have a single flaw that they might compute a "false" result as it might emit <key: x-null> and <key: x-y> (ie, <key: x-null>) does not align with what you would expect thinking in terms of SQL. However, SQL is based on batch processing thus semantics cannot be applied 1-to-1 to stream processing. Furthermore, a single record might join multiple times (also for inner joins) within a window. Thus, I am not sure what you mean by '1 time' configuration within retention period? Eager to get more details about this idea -- I like joins a lot and what to make them better out-of-the-box -- I am not happy about the current "false" left/outer join results but did not have a good idea about how to resolve it. Just delaying does only mitigate the problem but not resolve it completely. -Matthias On 5/5/17 6:54 AM, Adrian McCague wrote: > Hi Matthias > > We have been thinking about this problem recently and thought, wouldn't it be > nice if a join could be configured to be '1 time', within the retention > period of the join window. So if a join has occurred already on a particular > key, further ones will be ignored for the remainder of the retention period > (or maybe the retention is reset). Assuming I have not missed a feature on > the current DSL. > > I suppose the implementation would not be much different from putting a > transform after the join as a second state store would be required? > > Adrian > > -----Original Message----- > From: Matthias J. Sax [mailto:matth...@confluent.io] > Sent: 04 May 2017 23:55 > To: users@kafka.apache.org > Subject: Re: Deduplicating KStream-KStream join > > Hi, > > we don't believe in triggers ;) > https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ > > -> Thus, it's a BigQuery flaw to not support updates... (IMHO) > > (We are also considering improving KStream-KStream join though, but that's of > course no short term solution for you: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions) > > The simplest way to work around this within Streams would be to use a KTable > -- but it might not be 100% guaranteed that all duplicates get filtered (as > we flush on commit) -- you get only very high probability: > http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step > > > However, building a de-duplication by yourself is not so hard basically. > Just use a #transform() with a state. Put all results into the state and > thus, duplicates "overwrite" older (false) results. > > The tricky part is only do decide, how long you want to delay your output and > evict result from your state and sent it downstream. As it is stream > processing, there might always be a late arriving record, and thus, if you > decide that at <key, x-null> result should be evicted from your store and > sent to BigQuery, you might still be wrong, and later a <key, x-y> updates > might arrive in your de-duplication filter... > > -> this problem is the actual reason, why we don't believe in triggers > -- it's a simplification to assume that you can trigger at some point and get > a final result (but you can never be sure....) -- be think a cleaner solution > is the ability to handle "change" (ie update) end-to-end. > > Thus, the "cleanest" solution would actually be to make your downstream > applications aware of "late update". As you can't do anything about BigQuery, > you could let your downstream application deal with it -- ie, when you query > BigQuery, filter those result records with some SQL magic. Not sure how > feasible this approach would be for your use case. > If it does not works, you can still go with the #transform() step if you can > define a good policy for how long to wait for late data. > > > Hope this helps. > > > -Matthias > > > > > On 5/4/17 8:26 AM, Ofir Sharony wrote: >> Hi guys, >> >> I want to perform a join between two KStreams. >> An event may appear only on one of the streams (either one of them), >> so I can't use inner join (which emits only on a match) or left join >> (which emits only when the left input arrives). >> This leaves me with outer join. The problem with outer join is that it >> emits on every record arrival, which creates duplicates at the output node. >> >> My downstream application is BigQuery, which doesn't support updates, >> thus can't do the dedup by itself. >> What is the best practice implementing deduplication in KafkaStreams, >> keeping only the latest, most updated record? >> Is it possible to emit a record only after some time has passed, or >> upon a certain trigger? >> >> Thanks. >> >> *Ofir Sharony* >> BackEnd Tech Lead >> >> Mobile: +972-54-7560277 <+972%2054-756-0277> | >> ofir.shar...@myheritage.com >> | www.myheritage.com >> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel >> >> <http://www.myheritage.com/> >> >> <https://www.facebook.com/myheritage> >> <https://twitter.com/myheritage> <http://blog.myheritage.com/> >> <https://www.youtube.com/user/MyHeritageLtd> >> >
signature.asc
Description: OpenPGP digital signature