Thanks Matthias for the detailed explanation. I'll go for the transform() workaround, let's see how it goes.
Ofir. *Ofir Sharony* BackEnd Tech Lead Mobile: +972-54-7560277 | ofir.shar...@myheritage.com | www.myheritage.com MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel <http://www.myheritage.com/> <https://www.facebook.com/myheritage> <https://twitter.com/myheritage> <http://blog.myheritage.com/> <https://www.youtube.com/user/MyHeritageLtd> On Tue, May 9, 2017 at 1:08 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Offir, > > Yes, KTable would not be 100% reliable: > > > The simplest way to work around this within Streams would be to use a > > KTable -- but it might not be 100% guaranteed that all duplicates get > > filtered (as we flush on commit) > > Your suggestion (1) to add a TTL is unfortunately not easily to > implements atm. The problem is, that on commit, Streams need to flush > all stores/caches to guarantee at-least-once semantics. If we would not > flush on commit, we would need quite complex logic to track which keys > did get flushed and which not. Not saying it's impossible, but it does > not follow the overall architecture of Streams atm. > > About (2): > > > Are these ever clean themselves > > up, or live forever, thus grow indefinitely? > > For non-windowed KTable this is no issues as long as the key space is > bounded -- update just overwrite older key-value pairs and we apply > log-compaction to the changelog topic. > > For windowed KTables, we apply an additional retention time to both > RockDB store and the changelog topic. As the keys are composite keys > with (original-key,window-start-timestamp) the key space is unbounded. > The retention time ensure, that old windows that are not update to the > retention period are eventually deleted. The default retention time is 1 > day and you can set it on a per-store basis using Windows#until() method > when you define the size of you join or aggregation window. > > > In my scenario, the KStream result of my join represents request logs, > > which are not really suited to be seen as a KTable, as every request > is > > unique. > > Thinking about this twice, you comment makes sense and KTable is no > solution for you problem. > > > I guess, the only way to filter out "false" <key, x-null> records would > be a custom transform() with a store that holds "null" records back > until a real result is received, or if some delay passed and you assume > that there will not be any "real join result". > > We hope to improve join semantics if future releases. For now, I guess > you need to apply this workaround. > > > -Matthias > > > > > > On 5/6/17 9:46 PM, Ofir Sharony wrote: > > Max, > > Thanks for your detailed answer. Couple of comments/questions: > > > > 1. When performing caching of a KTable in order to reduce the amount > of > > duplicates, as you mentioned, it doesn't provide 100% solution. From > the > > docs: > > *"The semantics of caching is that data is flushed to the state store > > and forwarded to the next downstream processor node whenever the > earliest > > of commit.interval.ms <http://commit.interval.ms> or > > cache.max.bytes.buffering (cache pressure) hits".* > > I want to suggest an implementation improvement regarding the time > > threshold. When using cache, I would have liked to see a behavior that > > flushes records a certain time after they were cached (like a > cache-TTL). > > In current implementation, events arriving very close to the next > > *commit.interval* will be flushed very quickly, thus subsequent events > > with the same key will necessarily create duplicates downstream. > > > > 2. In regards to converting the resulting KStream to a KTable - If I > > understand correctly, this will result with an ever updating state > store > > (and same for its backing changelog topic). Are these ever clean > themselves > > up, or live forever, thus grow indefinitely? > > In my scenario, the KStream result of my join represents request logs, > > which are not really suited to be seen as a KTable, as every request > is > > unique. > > > > Thanks, > > Ofir. > > > > *Ofir Sharony* > > BackEnd Tech Lead > > > > Mobile: +972-54-7560277 | ofir.shar...@myheritage.com | > www.myheritage.com > > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel > > > > <http://www.myheritage.com/> > > > > <https://www.facebook.com/myheritage> > > <https://twitter.com/myheritage> <http://blog.myheritage.com/> > > <https://www.youtube.com/user/MyHeritageLtd> > > > > > > On Fri, May 5, 2017 at 8:15 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Not sure if I can follow (and what type of join you have in mind). > >> > >> I assume we are still talking about windowed KStream-KStream joins. > >> > >> For this scenario, and an inner join, the computation is correct as > >> implements. Only left/outer join have a single flaw that they might > >> compute a "false" result as it might emit <key: x-null> and <key: x-y> > >> (ie, <key: x-null>) does not align with what you would expect thinking > >> in terms of SQL. > >> > >> However, SQL is based on batch processing thus semantics cannot be > >> applied 1-to-1 to stream processing. > >> > >> Furthermore, a single record might join multiple times (also for inner > >> joins) within a window. Thus, I am not sure what you mean by '1 time' > >> configuration within retention period? > >> > >> Eager to get more details about this idea -- I like joins a lot and what > >> to make them better out-of-the-box -- I am not happy about the current > >> "false" left/outer join results but did not have a good idea about how > >> to resolve it. Just delaying does only mitigate the problem but not > >> resolve it completely. > >> > >> > >> -Matthias > >> > >> > >> On 5/5/17 6:54 AM, Adrian McCague wrote: > >>> Hi Matthias > >>> > >>> We have been thinking about this problem recently and thought, wouldn't > >> it be nice if a join could be configured to be '1 time', within the > >> retention period of the join window. So if a join has occurred already > on a > >> particular key, further ones will be ignored for the remainder of the > >> retention period (or maybe the retention is reset). Assuming I have not > >> missed a feature on the current DSL. > >>> > >>> I suppose the implementation would not be much different from putting a > >> transform after the join as a second state store would be required? > >>> > >>> Adrian > >>> > >>> -----Original Message----- > >>> From: Matthias J. Sax [mailto:matth...@confluent.io] > >>> Sent: 04 May 2017 23:55 > >>> To: users@kafka.apache.org > >>> Subject: Re: Deduplicating KStream-KStream join > >>> > >>> Hi, > >>> > >>> we don't believe in triggers ;) > >>> https://www.confluent.io/blog/watermarks-tables-event-time- > >> dataflow-model/ > >>> > >>> -> Thus, it's a BigQuery flaw to not support updates... (IMHO) > >>> > >>> (We are also considering improving KStream-KStream join though, but > >> that's of course no short term solution for you: > >>> https://cwiki.apache.org/confluence/display/KAFKA/ > >> Kafka+Streams+Discussions) > >>> > >>> The simplest way to work around this within Streams would be to use a > >> KTable -- but it might not be 100% guaranteed that all duplicates get > >> filtered (as we flush on commit) -- you get only very high probability: > >>> http://docs.confluent.io/current/streams/faq.html#how- > >> can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step > >>> > >>> > >>> However, building a de-duplication by yourself is not so hard > basically. > >>> Just use a #transform() with a state. Put all results into the state > and > >> thus, duplicates "overwrite" older (false) results. > >>> > >>> The tricky part is only do decide, how long you want to delay your > >> output and evict result from your state and sent it downstream. As it is > >> stream processing, there might always be a late arriving record, and > thus, > >> if you decide that at <key, x-null> result should be evicted from your > >> store and sent to BigQuery, you might still be wrong, and later a <key, > >> x-y> updates might arrive in your de-duplication filter... > >>> > >>> -> this problem is the actual reason, why we don't believe in triggers > >>> -- it's a simplification to assume that you can trigger at some point > >> and get a final result (but you can never be sure....) -- be think a > >> cleaner solution is the ability to handle "change" (ie update) > end-to-end. > >>> > >>> Thus, the "cleanest" solution would actually be to make your downstream > >> applications aware of "late update". As you can't do anything about > >> BigQuery, you could let your downstream application deal with it -- ie, > >> when you query BigQuery, filter those result records with some SQL > magic. > >> Not sure how feasible this approach would be for your use case. > >>> If it does not works, you can still go with the #transform() step if > you > >> can define a good policy for how long to wait for late data. > >>> > >>> > >>> Hope this helps. > >>> > >>> > >>> -Matthias > >>> > >>> > >>> > >>> > >>> On 5/4/17 8:26 AM, Ofir Sharony wrote: > >>>> Hi guys, > >>>> > >>>> I want to perform a join between two KStreams. > >>>> An event may appear only on one of the streams (either one of them), > >>>> so I can't use inner join (which emits only on a match) or left join > >>>> (which emits only when the left input arrives). > >>>> This leaves me with outer join. The problem with outer join is that it > >>>> emits on every record arrival, which creates duplicates at the output > >> node. > >>>> > >>>> My downstream application is BigQuery, which doesn't support updates, > >>>> thus can't do the dedup by itself. > >>>> What is the best practice implementing deduplication in KafkaStreams, > >>>> keeping only the latest, most updated record? > >>>> Is it possible to emit a record only after some time has passed, or > >>>> upon a certain trigger? > >>>> > >>>> Thanks. > >>>> > >>>> *Ofir Sharony* > >>>> BackEnd Tech Lead > >>>> > >>>> Mobile: +972-54-7560277 <+972%2054-756-0277> | > >>>> ofir.shar...@myheritage.com > >>>> | www.myheritage.com > >>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel > >>>> > >>>> <http://www.myheritage.com/> > >>>> > >>>> <https://www.facebook.com/myheritage> > >>>> <https://twitter.com/myheritage> <http://blog.myheritage.com/ > > > >>>> <https://www.youtube.com/user/MyHeritageLtd> > >>>> > >>> > >> > >> > > > >