Hi,

I just realized that this thread got somehow dropped... Sorry for that.

If you use KTable, each update to RocksDB is also written into a
changelog topic (for fault-tolerance and rebalancing). The changelog
topic is a *compacted topic*, thus, it is guaranteed that the latest
value for each key is never deleted (as long as no <key:null> message
gets written).

-Matthias

On 09/12/2016 09:26 AM, Guillermo Lammers Corral wrote:
> Thanks for the information, very useful.
> 
> Finally, I have two KStream. I use aggregateByKey for one of them to get a
> <key, List[object]> (I've assumed that we'll receive this source first),
> thus I can join the result with the other KStream by key using
> KStream-KTable join.
> 
> Now, I'm trying to understand the state store logic (RocksDB plus
> changelogs topics).
> 
> Could I have always available the aggregated data? When do the messages
> published in Kafka changelog topics get deleted? What happen in that case?
> 
> Thanks!
> 
> 2016-09-06 13:33 GMT+02:00 Michael Noll <mich...@confluent.io>:
> 
>> Also, another upcoming feature is (slightly simplified description follows)
>> "global" state/KTable.  Today, a KTable is always partitioned/sharded.  One
>> advantage of global state/tables will be for use cases where you need to
>> perform non-key joins, similar to what Guillermo described previously in
>> this thread.  Keep in mind that global tables implies that each
>> corresponding stream task will get a full, replicated copy of the table, so
>> you must pay attention to the memory/disk footprint.
>>
>> On Tue, Sep 6, 2016 at 10:31 AM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Hi,
>>>
>>> currently, in DSL only primary-key joins are supported. However in you
>>> case, you have a non-primary-key join. There is already a JIRA to add
>>> support for this: https://issues.apache.org/jira/browse/KAFKA-3705
>>>
>>> Currently, you will need to use Processor API. For the non-time-based
>>> input (if not too large) you could replicate it to all application
>>> instances and build a hash table (a simple in-memory HashMap might be
>>> sufficient). If input is too large to fit into memory, you need to build
>>> a more sophisticated hash table that also uses the disk.
>>>
>>> For replication, you will need an additional KafkaConsumer that assigns
>>> all partitions manually to itself and does never commit its offset
>>> (offset commit only work correctly, if each partitions is only assigned
>>> once -- but in your case, it would be assigned multiple times, depending
>>> on your overall parallelism of your Streams app).
>>>
>>> For the time-based input, you can just read it regularly, and for each
>>> record you do a look-up in the HashTable to compute the join.
>>>
>>> Does this make sense?
>>>
>>>
>>> -Matthias
>>>
>>> On 09/05/2016 11:28 PM, Guillermo Lammers Corral wrote:
>>>> Hi Matthias,
>>>>
>>>> Good question... the main problem is related with the kind of my data.
>>> The
>>>> first source of data is time based and the second one not but both
>> have a
>>>> field with the same value (I don't know how to use it in the join
>> without
>>>> being key. It can't, let me explain why):
>>>>
>>>> ObjectX (sameValue, date, valueX)
>>>> ObjectY (uniqueId, sameValue, valueY)
>>>>
>>>> I want to create a result object based on X and Y using sameValue as
>>> "key"
>>>> but there are some problems here:
>>>>
>>>>    - sameValue of ObjectX cannot be key because I must take care of
>> date
>>>>    - sameValue of ObjectY cannot be key because sameValue is not key of
>>>>    ObjectX (we couldn't join anything)
>>>>    - uniqueId of ObjectY cannot be key because does not exists in
>> ObjectX
>>>>    (we couldn't join anything)
>>>>    - I couldn't use as key something like someValue_date because date
>>> does
>>>>    not exists in ObjectY (we couldn't join anything)
>>>>
>>>> So, actually I don't know how to implement this using Kafka Streams. I
>>> need
>>>> join data using a value field of each message (sameValue but not as
>> key)
>>>> and do it indefinetely because I don't know when data will be sent
>>> whereas
>>>> the process will always be creating new result objects.
>>>>
>>>> Basically, I want to use streaming with Kafka Stream to make joins
>>> between
>>>> two sources of data but we cannot use KTable (key problems) and we
>> cannot
>>>> use windowed KStream (or yes but with memory issues as you said)
>> because
>>> I
>>>> don't know when data will arrive and I cannot lose data (any matching
>>>> between both sources).
>>>>
>>>> Do you see any solution? Will I have to use Processor API instead of
>> DSL
>>> to
>>>> spill data to disk as you said?
>>>>
>>>> Thanks in advance!
>>>>
>>>> 2016-09-05 20:00 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
>>>>
>>>>> Hey,
>>>>>
>>>>> are you sure, you want to join everything? This will result in a huge
>>>>> memory footprint of your application. You are right, that you cannot
>> use
>>>>> KTable, however, windowed KStream joins would work -- you only need to
>>>>> specify a huge window (ie, use Long.MAX_VALUE; this will effectively
>> be
>>>>> "infinitely large") thus that all data falls into a single window.
>>>>>
>>>>> The issue will be, that all data will be buffered in memory, thus, if
>>>>> your application run very long, it will eventually fail (I would
>>>>> assume). Thus, again my initial question: are you sure, you want to
>> join
>>>>> everything? (It's stream processing, not batch processing...)
>>>>>
>>>>> If the answer is still yes, and you hit a memory issue, you will need
>> to
>>>>> fall back to use Processor API instead of DSL to spill data to disk if
>>>>> it does not fit into memory and more (ie, you will need to implement
>>>>> your own version of an symmetric-hash-join that spills to disk). Of
>>>>> course, the disk usage will also be huge. Eventually, your disc might
>>>>> also become too small...
>>>>>
>>>>> Can you clarify, why you want to join everything? This does not sound
>>>>> like a good idea. Very large windows are handleable, but "infinite"
>>>>> windows are very problematic in stream processing.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I've been thinking how to solve with Kafka Streams one of my business
>>>>>> process without success for the moment. Hope someone can help me.
>>>>>>
>>>>>> I am reading from two topics events like that (I'll simplify the
>>> problem
>>>>> at
>>>>>> this point):
>>>>>>
>>>>>> ObjectX
>>>>>> Key: String
>>>>>> Value: String
>>>>>>
>>>>>> ObjectY
>>>>>> Key: String
>>>>>> Value: String
>>>>>>
>>>>>> I want to do some kind of "join" for all events without windowing but
>>>>> also
>>>>>> without being KTables...
>>>>>>
>>>>>> Example:
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> ObjectX("0001", "a") -> TopicA
>>>>>>
>>>>>> Expected output TopicResult:
>>>>>>
>>>>>> nothing
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> ObjectX("0001", "b") -> Topic A
>>>>>>
>>>>>> Expected output TopicResult:
>>>>>>
>>>>>> nothing
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> ObjectY("0001", "d") -> Topic B:
>>>>>>
>>>>>> Expected output TopicResult:
>>>>>>
>>>>>> ObjectZ("0001", ("a", "d"))
>>>>>> ObjectZ("0001", ("b", "d"))
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> ObjectY("0001", "e") -> Topic B:
>>>>>>
>>>>>> Expected output TopicResult:
>>>>>>
>>>>>> ObjectZ("0001", ("a", "e"))
>>>>>> ObjectZ("0001", ("b", "e"))
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> TopicResult at the end:
>>>>>>
>>>>>> ObjectZ("0001", ("a", "d"))
>>>>>> ObjectZ("0001", ("b", "d"))
>>>>>> ObjectZ("0001", ("a", "e"))
>>>>>> ObjectZ("0001", ("b", "e"))
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> I think I can't use KTable-KTable join because I want to match all
>> the
>>>>>> events from the beginning of time. Hence, I can't use KStream-KStream
>>>>> join
>>>>>> because force me to use windowing. Same for KStream-KTable join...
>>>>>>
>>>>>> Any expert using Kafka Streams could help me with some tips?
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to