Thanks for the information, very useful. Finally, I have two KStream. I use aggregateByKey for one of them to get a <key, List[object]> (I've assumed that we'll receive this source first), thus I can join the result with the other KStream by key using KStream-KTable join.
Now, I'm trying to understand the state store logic (RocksDB plus changelogs topics). Could I have always available the aggregated data? When do the messages published in Kafka changelog topics get deleted? What happen in that case? Thanks! 2016-09-06 13:33 GMT+02:00 Michael Noll <mich...@confluent.io>: > Also, another upcoming feature is (slightly simplified description follows) > "global" state/KTable. Today, a KTable is always partitioned/sharded. One > advantage of global state/tables will be for use cases where you need to > perform non-key joins, similar to what Guillermo described previously in > this thread. Keep in mind that global tables implies that each > corresponding stream task will get a full, replicated copy of the table, so > you must pay attention to the memory/disk footprint. > > On Tue, Sep 6, 2016 at 10:31 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Hi, > > > > currently, in DSL only primary-key joins are supported. However in you > > case, you have a non-primary-key join. There is already a JIRA to add > > support for this: https://issues.apache.org/jira/browse/KAFKA-3705 > > > > Currently, you will need to use Processor API. For the non-time-based > > input (if not too large) you could replicate it to all application > > instances and build a hash table (a simple in-memory HashMap might be > > sufficient). If input is too large to fit into memory, you need to build > > a more sophisticated hash table that also uses the disk. > > > > For replication, you will need an additional KafkaConsumer that assigns > > all partitions manually to itself and does never commit its offset > > (offset commit only work correctly, if each partitions is only assigned > > once -- but in your case, it would be assigned multiple times, depending > > on your overall parallelism of your Streams app). > > > > For the time-based input, you can just read it regularly, and for each > > record you do a look-up in the HashTable to compute the join. > > > > Does this make sense? > > > > > > -Matthias > > > > On 09/05/2016 11:28 PM, Guillermo Lammers Corral wrote: > > > Hi Matthias, > > > > > > Good question... the main problem is related with the kind of my data. > > The > > > first source of data is time based and the second one not but both > have a > > > field with the same value (I don't know how to use it in the join > without > > > being key. It can't, let me explain why): > > > > > > ObjectX (sameValue, date, valueX) > > > ObjectY (uniqueId, sameValue, valueY) > > > > > > I want to create a result object based on X and Y using sameValue as > > "key" > > > but there are some problems here: > > > > > > - sameValue of ObjectX cannot be key because I must take care of > date > > > - sameValue of ObjectY cannot be key because sameValue is not key of > > > ObjectX (we couldn't join anything) > > > - uniqueId of ObjectY cannot be key because does not exists in > ObjectX > > > (we couldn't join anything) > > > - I couldn't use as key something like someValue_date because date > > does > > > not exists in ObjectY (we couldn't join anything) > > > > > > So, actually I don't know how to implement this using Kafka Streams. I > > need > > > join data using a value field of each message (sameValue but not as > key) > > > and do it indefinetely because I don't know when data will be sent > > whereas > > > the process will always be creating new result objects. > > > > > > Basically, I want to use streaming with Kafka Stream to make joins > > between > > > two sources of data but we cannot use KTable (key problems) and we > cannot > > > use windowed KStream (or yes but with memory issues as you said) > because > > I > > > don't know when data will arrive and I cannot lose data (any matching > > > between both sources). > > > > > > Do you see any solution? Will I have to use Processor API instead of > DSL > > to > > > spill data to disk as you said? > > > > > > Thanks in advance! > > > > > > 2016-09-05 20:00 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > > > > > >> Hey, > > >> > > >> are you sure, you want to join everything? This will result in a huge > > >> memory footprint of your application. You are right, that you cannot > use > > >> KTable, however, windowed KStream joins would work -- you only need to > > >> specify a huge window (ie, use Long.MAX_VALUE; this will effectively > be > > >> "infinitely large") thus that all data falls into a single window. > > >> > > >> The issue will be, that all data will be buffered in memory, thus, if > > >> your application run very long, it will eventually fail (I would > > >> assume). Thus, again my initial question: are you sure, you want to > join > > >> everything? (It's stream processing, not batch processing...) > > >> > > >> If the answer is still yes, and you hit a memory issue, you will need > to > > >> fall back to use Processor API instead of DSL to spill data to disk if > > >> it does not fit into memory and more (ie, you will need to implement > > >> your own version of an symmetric-hash-join that spills to disk). Of > > >> course, the disk usage will also be huge. Eventually, your disc might > > >> also become too small... > > >> > > >> Can you clarify, why you want to join everything? This does not sound > > >> like a good idea. Very large windows are handleable, but "infinite" > > >> windows are very problematic in stream processing. > > >> > > >> > > >> -Matthias > > >> > > >> > > >> On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote: > > >>> Hi, > > >>> > > >>> I've been thinking how to solve with Kafka Streams one of my business > > >>> process without success for the moment. Hope someone can help me. > > >>> > > >>> I am reading from two topics events like that (I'll simplify the > > problem > > >> at > > >>> this point): > > >>> > > >>> ObjectX > > >>> Key: String > > >>> Value: String > > >>> > > >>> ObjectY > > >>> Key: String > > >>> Value: String > > >>> > > >>> I want to do some kind of "join" for all events without windowing but > > >> also > > >>> without being KTables... > > >>> > > >>> Example: > > >>> > > >>> ============================== > > >>> > > >>> ObjectX("0001", "a") -> TopicA > > >>> > > >>> Expected output TopicResult: > > >>> > > >>> nothing > > >>> > > >>> ============================== > > >>> > > >>> ObjectX("0001", "b") -> Topic A > > >>> > > >>> Expected output TopicResult: > > >>> > > >>> nothing > > >>> > > >>> ============================== > > >>> > > >>> ObjectY("0001", "d") -> Topic B: > > >>> > > >>> Expected output TopicResult: > > >>> > > >>> ObjectZ("0001", ("a", "d")) > > >>> ObjectZ("0001", ("b", "d")) > > >>> > > >>> ============================== > > >>> > > >>> ============================== > > >>> > > >>> ObjectY("0001", "e") -> Topic B: > > >>> > > >>> Expected output TopicResult: > > >>> > > >>> ObjectZ("0001", ("a", "e")) > > >>> ObjectZ("0001", ("b", "e")) > > >>> > > >>> ============================== > > >>> > > >>> TopicResult at the end: > > >>> > > >>> ObjectZ("0001", ("a", "d")) > > >>> ObjectZ("0001", ("b", "d")) > > >>> ObjectZ("0001", ("a", "e")) > > >>> ObjectZ("0001", ("b", "e")) > > >>> > > >>> ============================== > > >>> > > >>> I think I can't use KTable-KTable join because I want to match all > the > > >>> events from the beginning of time. Hence, I can't use KStream-KStream > > >> join > > >>> because force me to use windowing. Same for KStream-KTable join... > > >>> > > >>> Any expert using Kafka Streams could help me with some tips? > > >>> > > >>> Thanks in advance. > > >>> > > >> > > >> > > > > > > > >