Hi, I just realized that this thread got somehow dropped... Sorry for that.
If you use KTable, each update to RocksDB is also written into a changelog topic (for fault-tolerance and rebalancing). The changelog topic is a *compacted topic*, thus, it is guaranteed that the latest value for each key is never deleted (as long as no <key:null> message gets written). -Matthias On 09/12/2016 09:26 AM, Guillermo Lammers Corral wrote: > Thanks for the information, very useful. > > Finally, I have two KStream. I use aggregateByKey for one of them to get a > <key, List[object]> (I've assumed that we'll receive this source first), > thus I can join the result with the other KStream by key using > KStream-KTable join. > > Now, I'm trying to understand the state store logic (RocksDB plus > changelogs topics). > > Could I have always available the aggregated data? When do the messages > published in Kafka changelog topics get deleted? What happen in that case? > > Thanks! > > 2016-09-06 13:33 GMT+02:00 Michael Noll <mich...@confluent.io>: > >> Also, another upcoming feature is (slightly simplified description follows) >> "global" state/KTable. Today, a KTable is always partitioned/sharded. One >> advantage of global state/tables will be for use cases where you need to >> perform non-key joins, similar to what Guillermo described previously in >> this thread. Keep in mind that global tables implies that each >> corresponding stream task will get a full, replicated copy of the table, so >> you must pay attention to the memory/disk footprint. >> >> On Tue, Sep 6, 2016 at 10:31 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Hi, >>> >>> currently, in DSL only primary-key joins are supported. However in you >>> case, you have a non-primary-key join. There is already a JIRA to add >>> support for this: https://issues.apache.org/jira/browse/KAFKA-3705 >>> >>> Currently, you will need to use Processor API. For the non-time-based >>> input (if not too large) you could replicate it to all application >>> instances and build a hash table (a simple in-memory HashMap might be >>> sufficient). If input is too large to fit into memory, you need to build >>> a more sophisticated hash table that also uses the disk. >>> >>> For replication, you will need an additional KafkaConsumer that assigns >>> all partitions manually to itself and does never commit its offset >>> (offset commit only work correctly, if each partitions is only assigned >>> once -- but in your case, it would be assigned multiple times, depending >>> on your overall parallelism of your Streams app). >>> >>> For the time-based input, you can just read it regularly, and for each >>> record you do a look-up in the HashTable to compute the join. >>> >>> Does this make sense? >>> >>> >>> -Matthias >>> >>> On 09/05/2016 11:28 PM, Guillermo Lammers Corral wrote: >>>> Hi Matthias, >>>> >>>> Good question... the main problem is related with the kind of my data. >>> The >>>> first source of data is time based and the second one not but both >> have a >>>> field with the same value (I don't know how to use it in the join >> without >>>> being key. It can't, let me explain why): >>>> >>>> ObjectX (sameValue, date, valueX) >>>> ObjectY (uniqueId, sameValue, valueY) >>>> >>>> I want to create a result object based on X and Y using sameValue as >>> "key" >>>> but there are some problems here: >>>> >>>> - sameValue of ObjectX cannot be key because I must take care of >> date >>>> - sameValue of ObjectY cannot be key because sameValue is not key of >>>> ObjectX (we couldn't join anything) >>>> - uniqueId of ObjectY cannot be key because does not exists in >> ObjectX >>>> (we couldn't join anything) >>>> - I couldn't use as key something like someValue_date because date >>> does >>>> not exists in ObjectY (we couldn't join anything) >>>> >>>> So, actually I don't know how to implement this using Kafka Streams. I >>> need >>>> join data using a value field of each message (sameValue but not as >> key) >>>> and do it indefinetely because I don't know when data will be sent >>> whereas >>>> the process will always be creating new result objects. >>>> >>>> Basically, I want to use streaming with Kafka Stream to make joins >>> between >>>> two sources of data but we cannot use KTable (key problems) and we >> cannot >>>> use windowed KStream (or yes but with memory issues as you said) >> because >>> I >>>> don't know when data will arrive and I cannot lose data (any matching >>>> between both sources). >>>> >>>> Do you see any solution? Will I have to use Processor API instead of >> DSL >>> to >>>> spill data to disk as you said? >>>> >>>> Thanks in advance! >>>> >>>> 2016-09-05 20:00 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: >>>> >>>>> Hey, >>>>> >>>>> are you sure, you want to join everything? This will result in a huge >>>>> memory footprint of your application. You are right, that you cannot >> use >>>>> KTable, however, windowed KStream joins would work -- you only need to >>>>> specify a huge window (ie, use Long.MAX_VALUE; this will effectively >> be >>>>> "infinitely large") thus that all data falls into a single window. >>>>> >>>>> The issue will be, that all data will be buffered in memory, thus, if >>>>> your application run very long, it will eventually fail (I would >>>>> assume). Thus, again my initial question: are you sure, you want to >> join >>>>> everything? (It's stream processing, not batch processing...) >>>>> >>>>> If the answer is still yes, and you hit a memory issue, you will need >> to >>>>> fall back to use Processor API instead of DSL to spill data to disk if >>>>> it does not fit into memory and more (ie, you will need to implement >>>>> your own version of an symmetric-hash-join that spills to disk). Of >>>>> course, the disk usage will also be huge. Eventually, your disc might >>>>> also become too small... >>>>> >>>>> Can you clarify, why you want to join everything? This does not sound >>>>> like a good idea. Very large windows are handleable, but "infinite" >>>>> windows are very problematic in stream processing. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote: >>>>>> Hi, >>>>>> >>>>>> I've been thinking how to solve with Kafka Streams one of my business >>>>>> process without success for the moment. Hope someone can help me. >>>>>> >>>>>> I am reading from two topics events like that (I'll simplify the >>> problem >>>>> at >>>>>> this point): >>>>>> >>>>>> ObjectX >>>>>> Key: String >>>>>> Value: String >>>>>> >>>>>> ObjectY >>>>>> Key: String >>>>>> Value: String >>>>>> >>>>>> I want to do some kind of "join" for all events without windowing but >>>>> also >>>>>> without being KTables... >>>>>> >>>>>> Example: >>>>>> >>>>>> ============================== >>>>>> >>>>>> ObjectX("0001", "a") -> TopicA >>>>>> >>>>>> Expected output TopicResult: >>>>>> >>>>>> nothing >>>>>> >>>>>> ============================== >>>>>> >>>>>> ObjectX("0001", "b") -> Topic A >>>>>> >>>>>> Expected output TopicResult: >>>>>> >>>>>> nothing >>>>>> >>>>>> ============================== >>>>>> >>>>>> ObjectY("0001", "d") -> Topic B: >>>>>> >>>>>> Expected output TopicResult: >>>>>> >>>>>> ObjectZ("0001", ("a", "d")) >>>>>> ObjectZ("0001", ("b", "d")) >>>>>> >>>>>> ============================== >>>>>> >>>>>> ============================== >>>>>> >>>>>> ObjectY("0001", "e") -> Topic B: >>>>>> >>>>>> Expected output TopicResult: >>>>>> >>>>>> ObjectZ("0001", ("a", "e")) >>>>>> ObjectZ("0001", ("b", "e")) >>>>>> >>>>>> ============================== >>>>>> >>>>>> TopicResult at the end: >>>>>> >>>>>> ObjectZ("0001", ("a", "d")) >>>>>> ObjectZ("0001", ("b", "d")) >>>>>> ObjectZ("0001", ("a", "e")) >>>>>> ObjectZ("0001", ("b", "e")) >>>>>> >>>>>> ============================== >>>>>> >>>>>> I think I can't use KTable-KTable join because I want to match all >> the >>>>>> events from the beginning of time. Hence, I can't use KStream-KStream >>>>> join >>>>>> because force me to use windowing. Same for KStream-KTable join... >>>>>> >>>>>> Any expert using Kafka Streams could help me with some tips? >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature