Thanks Matthias for the detailed explanation.
I'll go for the transform() workaround, let's see how it goes.

Ofir.

*Ofir Sharony*
BackEnd Tech Lead

Mobile: +972-54-7560277 | ofir.shar...@myheritage.com | www.myheritage.com
MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel

<http://www.myheritage.com/>

<https://www.facebook.com/myheritage>
<https://twitter.com/myheritage>         <http://blog.myheritage.com/>
    <https://www.youtube.com/user/MyHeritageLtd>


On Tue, May 9, 2017 at 1:08 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Offir,
>
> Yes, KTable would not be 100% reliable:
>
> > The simplest way to work around this within Streams would be to use a
> > KTable -- but it might not be 100% guaranteed that all duplicates get
> > filtered (as we flush on commit)
>
> Your suggestion (1) to add a TTL is unfortunately not easily to
> implements atm. The problem is, that on commit, Streams need to flush
> all stores/caches to guarantee at-least-once semantics. If we would not
> flush on commit, we would need quite complex logic to track which keys
> did get flushed and which not. Not saying it's impossible, but it does
> not follow the overall architecture of Streams atm.
>
> About (2):
>
> > Are these ever clean themselves
> >    up, or live forever, thus grow indefinitely?
>
> For non-windowed KTable this is no issues as long as the key space is
> bounded -- update just overwrite older key-value pairs and we apply
> log-compaction to the changelog topic.
>
> For windowed KTables, we apply an additional retention time to both
> RockDB store and the changelog topic. As the keys are composite keys
> with (original-key,window-start-timestamp) the key space is unbounded.
> The retention time ensure, that old windows that are not update to the
> retention period are eventually deleted. The default retention time is 1
> day and you can set it on a per-store basis using Windows#until() method
> when you define the size of you join or aggregation window.
>
> > In my scenario, the KStream result of my join represents request logs,
> >    which are not really suited to be seen as a KTable, as every request
> is
> >    unique.
>
> Thinking about this twice, you comment makes sense and KTable is no
> solution for you problem.
>
>
> I guess, the only way to filter out "false" <key, x-null> records would
> be a custom transform() with a store that holds "null" records back
> until a real result is received, or if some delay passed and you assume
> that there will not be any "real join result".
>
> We hope to improve join semantics if future releases. For now, I guess
> you need to apply this workaround.
>
>
> -Matthias
>
>
>
>
>
> On 5/6/17 9:46 PM, Ofir Sharony wrote:
> > Max,
> > Thanks for your detailed answer. Couple of comments/questions:
> >
> >    1. When performing caching of a KTable in order to reduce the amount
> of
> >    duplicates, as you mentioned, it doesn't provide 100% solution. From
> the
> >    docs:
> >    *"The semantics of caching is that data is flushed to the state store
> >    and forwarded to the next downstream processor node whenever the
> earliest
> >    of commit.interval.ms <http://commit.interval.ms> or
> >    cache.max.bytes.buffering (cache pressure) hits".*
> >    I want to suggest an implementation improvement regarding the time
> >    threshold. When using cache, I would have liked to see a behavior that
> >    flushes records a certain time after they were cached (like a
> cache-TTL).
> >    In current implementation, events arriving very close to the next
> >    *commit.interval* will be flushed very quickly, thus subsequent events
> >    with the same key will necessarily create duplicates downstream.
> >
> >    2. In regards to converting the resulting KStream to a KTable - If I
> >    understand correctly, this will result with an ever updating state
> store
> >    (and same for its backing changelog topic). Are these ever clean
> themselves
> >    up, or live forever, thus grow indefinitely?
> >    In my scenario, the KStream result of my join represents request logs,
> >    which are not really suited to be seen as a KTable, as every request
> is
> >    unique.
> >
> > Thanks,
> > Ofir.
> >
> > *Ofir Sharony*
> > BackEnd Tech Lead
> >
> > Mobile: +972-54-7560277 | ofir.shar...@myheritage.com |
> www.myheritage.com
> > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >
> > <http://www.myheritage.com/>
> >
> > <https://www.facebook.com/myheritage>
> > <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
> >     <https://www.youtube.com/user/MyHeritageLtd>
> >
> >
> > On Fri, May 5, 2017 at 8:15 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Not sure if I can follow (and what type of join you have in mind).
> >>
> >> I assume we are still talking about windowed KStream-KStream joins.
> >>
> >> For this scenario, and an inner join, the computation is correct as
> >> implements. Only left/outer join have a single flaw that they might
> >> compute a "false" result as it might emit <key: x-null> and <key: x-y>
> >> (ie, <key: x-null>) does not align with what you would expect thinking
> >> in terms of SQL.
> >>
> >> However, SQL is based on batch processing thus semantics cannot be
> >> applied 1-to-1 to stream processing.
> >>
> >> Furthermore, a single record might join multiple times (also for inner
> >> joins) within a window. Thus, I am not sure what you mean by '1 time'
> >> configuration within retention period?
> >>
> >> Eager to get more details about this idea -- I like joins a lot and what
> >> to make them better out-of-the-box -- I am not happy about the current
> >> "false" left/outer join results but did not have a good idea about how
> >> to resolve it. Just delaying does only mitigate the problem but not
> >> resolve it completely.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/5/17 6:54 AM, Adrian McCague wrote:
> >>> Hi Matthias
> >>>
> >>> We have been thinking about this problem recently and thought, wouldn't
> >> it be nice if a join could be configured to be '1 time', within the
> >> retention period of the join window. So if a join has occurred already
> on a
> >> particular key, further ones will be ignored for the remainder of the
> >> retention period (or maybe the retention is reset). Assuming I have not
> >> missed a feature on the current DSL.
> >>>
> >>> I suppose the implementation would not be much different from putting a
> >> transform after the join as a second state store would be required?
> >>>
> >>> Adrian
> >>>
> >>> -----Original Message-----
> >>> From: Matthias J. Sax [mailto:matth...@confluent.io]
> >>> Sent: 04 May 2017 23:55
> >>> To: users@kafka.apache.org
> >>> Subject: Re: Deduplicating KStream-KStream join
> >>>
> >>> Hi,
> >>>
> >>> we don't believe in triggers ;)
> >>> https://www.confluent.io/blog/watermarks-tables-event-time-
> >> dataflow-model/
> >>>
> >>> -> Thus, it's a BigQuery flaw to not support updates... (IMHO)
> >>>
> >>> (We are also considering improving KStream-KStream join though, but
> >> that's of course no short term solution for you:
> >>> https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams+Discussions)
> >>>
> >>> The simplest way to work around this within Streams would be to use a
> >> KTable -- but it might not be 100% guaranteed that all duplicates get
> >> filtered (as we flush on commit) -- you get only very high probability:
> >>> http://docs.confluent.io/current/streams/faq.html#how-
> >> can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
> >>>
> >>>
> >>> However, building a de-duplication by yourself is not so hard
> basically.
> >>> Just use a #transform() with a state. Put all results into the state
> and
> >> thus, duplicates "overwrite" older (false) results.
> >>>
> >>> The tricky part is only do decide, how long you want to delay your
> >> output and evict result from your state and sent it downstream. As it is
> >> stream processing, there might always be a late arriving record, and
> thus,
> >> if you decide that at <key, x-null> result should be evicted from your
> >> store and sent to BigQuery, you might still be wrong, and later a <key,
> >> x-y> updates might arrive in your de-duplication filter...
> >>>
> >>> -> this problem is the actual reason, why we don't believe in triggers
> >>> -- it's a simplification to assume that you can trigger at some point
> >> and get a final result (but you can never be sure....) -- be think a
> >> cleaner solution is the ability to handle "change" (ie update)
> end-to-end.
> >>>
> >>> Thus, the "cleanest" solution would actually be to make your downstream
> >> applications aware of "late update". As you can't do anything about
> >> BigQuery, you could let your downstream application deal with it -- ie,
> >> when you query BigQuery, filter those result records with some SQL
> magic.
> >> Not sure how feasible this approach would be for your use case.
> >>> If it does not works, you can still go with the #transform() step if
> you
> >> can define a good policy for how long to wait for late data.
> >>>
> >>>
> >>> Hope this helps.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>>
> >>> On 5/4/17 8:26 AM, Ofir Sharony wrote:
> >>>> Hi guys,
> >>>>
> >>>> I want to perform a join between two KStreams.
> >>>> An event may appear only on one of the streams (either one of them),
> >>>> so I can't use inner join (which emits only on a match) or left join
> >>>> (which emits only when the left input arrives).
> >>>> This leaves me with outer join. The problem with outer join is that it
> >>>> emits on every record arrival, which creates duplicates at the output
> >> node.
> >>>>
> >>>> My downstream application is BigQuery, which doesn't support updates,
> >>>> thus can't do the dedup by itself.
> >>>> What is the best practice implementing deduplication in KafkaStreams,
> >>>> keeping only the latest, most updated record?
> >>>> Is it possible to emit a record only after some time has passed, or
> >>>> upon a certain trigger?
> >>>>
> >>>> Thanks.
> >>>>
> >>>> *Ofir Sharony*
> >>>> BackEnd Tech Lead
> >>>>
> >>>> Mobile: +972-54-7560277 <+972%2054-756-0277> |
> >>>> ofir.shar...@myheritage.com
> >>>> | www.myheritage.com
> >>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >>>>
> >>>> <http://www.myheritage.com/>
> >>>>
> >>>> <https://www.facebook.com/myheritage>
> >>>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/
> >
> >>>>     <https://www.youtube.com/user/MyHeritageLtd>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to