Not sure if I can follow (and what type of join you have in mind).

I assume we are still talking about windowed KStream-KStream joins.

For this scenario, and an inner join, the computation is correct as
implements. Only left/outer join have a single flaw that they might
compute a "false" result as it might emit <key: x-null> and <key: x-y>
(ie, <key: x-null>) does not align with what you would expect thinking
in terms of SQL.

However, SQL is based on batch processing thus semantics cannot be
applied 1-to-1 to stream processing.

Furthermore, a single record might join multiple times (also for inner
joins) within a window. Thus, I am not sure what you mean by '1 time'
configuration within retention period?

Eager to get more details about this idea -- I like joins a lot and what
to make them better out-of-the-box -- I am not happy about the current
"false" left/outer join results but did not have a good idea about how
to resolve it. Just delaying does only mitigate the problem but not
resolve it completely.


-Matthias


On 5/5/17 6:54 AM, Adrian McCague wrote:
> Hi Matthias
> 
> We have been thinking about this problem recently and thought, wouldn't it be 
> nice if a join could be configured to be '1 time', within the retention 
> period of the join window. So if a join has occurred already on a particular 
> key, further ones will be ignored for the remainder of the retention period 
> (or maybe the retention is reset). Assuming I have not missed a feature on 
> the current DSL.
> 
> I suppose the implementation would not be much different from putting a 
> transform after the join as a second state store would be required?
> 
> Adrian
>  
> -----Original Message-----
> From: Matthias J. Sax [mailto:matth...@confluent.io] 
> Sent: 04 May 2017 23:55
> To: users@kafka.apache.org
> Subject: Re: Deduplicating KStream-KStream join
> 
> Hi,
> 
> we don't believe in triggers ;)
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
> 
> -> Thus, it's a BigQuery flaw to not support updates... (IMHO)
> 
> (We are also considering improving KStream-KStream join though, but that's of 
> course no short term solution for you:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions)
> 
> The simplest way to work around this within Streams would be to use a KTable 
> -- but it might not be 100% guaranteed that all duplicates get filtered (as 
> we flush on commit) -- you get only very high probability:
> http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
> 
> 
> However, building a de-duplication by yourself is not so hard basically.
> Just use a #transform() with a state. Put all results into the state and 
> thus, duplicates "overwrite" older (false) results.
> 
> The tricky part is only do decide, how long you want to delay your output and 
> evict result from your state and sent it downstream. As it is stream 
> processing, there might always be a late arriving record, and thus, if you 
> decide that at <key, x-null> result should be evicted from your store and 
> sent to BigQuery, you might still be wrong, and later a <key, x-y> updates 
> might arrive in your de-duplication filter...
> 
> -> this problem is the actual reason, why we don't believe in triggers
> -- it's a simplification to assume that you can trigger at some point and get 
> a final result (but you can never be sure....) -- be think a cleaner solution 
> is the ability to handle "change" (ie update) end-to-end.
> 
> Thus, the "cleanest" solution would actually be to make your downstream 
> applications aware of "late update". As you can't do anything about BigQuery, 
> you could let your downstream application deal with it -- ie, when you query 
> BigQuery, filter those result records with some SQL magic. Not sure how 
> feasible this approach would be for your use case.
> If it does not works, you can still go with the #transform() step if you can 
> define a good policy for how long to wait for late data.
> 
> 
> Hope this helps.
> 
> 
> -Matthias
> 
> 
> 
> 
> On 5/4/17 8:26 AM, Ofir Sharony wrote:
>> Hi guys,
>>
>> I want to perform a join between two KStreams.
>> An event may appear only on one of the streams (either one of them), 
>> so I can't use inner join (which emits only on a match) or left join 
>> (which emits only when the left input arrives).
>> This leaves me with outer join. The problem with outer join is that it 
>> emits on every record arrival, which creates duplicates at the output node.
>>
>> My downstream application is BigQuery, which doesn't support updates, 
>> thus can't do the dedup by itself.
>> What is the best practice implementing deduplication in KafkaStreams, 
>> keeping only the latest, most updated record?
>> Is it possible to emit a record only after some time has passed, or 
>> upon a certain trigger?
>>
>> Thanks.
>>
>> *Ofir Sharony*
>> BackEnd Tech Lead
>>
>> Mobile: +972-54-7560277 <+972%2054-756-0277> | 
>> ofir.shar...@myheritage.com
>> | www.myheritage.com
>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>>
>> <http://www.myheritage.com/>
>>
>> <https://www.facebook.com/myheritage>
>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>     <https://www.youtube.com/user/MyHeritageLtd>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to