Thanks for the information, very useful.

Finally, I have two KStream. I use aggregateByKey for one of them to get a
<key, List[object]> (I've assumed that we'll receive this source first),
thus I can join the result with the other KStream by key using
KStream-KTable join.

Now, I'm trying to understand the state store logic (RocksDB plus
changelogs topics).

Could I have always available the aggregated data? When do the messages
published in Kafka changelog topics get deleted? What happen in that case?

Thanks!

2016-09-06 13:33 GMT+02:00 Michael Noll <mich...@confluent.io>:

> Also, another upcoming feature is (slightly simplified description follows)
> "global" state/KTable.  Today, a KTable is always partitioned/sharded.  One
> advantage of global state/tables will be for use cases where you need to
> perform non-key joins, similar to what Guillermo described previously in
> this thread.  Keep in mind that global tables implies that each
> corresponding stream task will get a full, replicated copy of the table, so
> you must pay attention to the memory/disk footprint.
>
> On Tue, Sep 6, 2016 at 10:31 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Hi,
> >
> > currently, in DSL only primary-key joins are supported. However in you
> > case, you have a non-primary-key join. There is already a JIRA to add
> > support for this: https://issues.apache.org/jira/browse/KAFKA-3705
> >
> > Currently, you will need to use Processor API. For the non-time-based
> > input (if not too large) you could replicate it to all application
> > instances and build a hash table (a simple in-memory HashMap might be
> > sufficient). If input is too large to fit into memory, you need to build
> > a more sophisticated hash table that also uses the disk.
> >
> > For replication, you will need an additional KafkaConsumer that assigns
> > all partitions manually to itself and does never commit its offset
> > (offset commit only work correctly, if each partitions is only assigned
> > once -- but in your case, it would be assigned multiple times, depending
> > on your overall parallelism of your Streams app).
> >
> > For the time-based input, you can just read it regularly, and for each
> > record you do a look-up in the HashTable to compute the join.
> >
> > Does this make sense?
> >
> >
> > -Matthias
> >
> > On 09/05/2016 11:28 PM, Guillermo Lammers Corral wrote:
> > > Hi Matthias,
> > >
> > > Good question... the main problem is related with the kind of my data.
> > The
> > > first source of data is time based and the second one not but both
> have a
> > > field with the same value (I don't know how to use it in the join
> without
> > > being key. It can't, let me explain why):
> > >
> > > ObjectX (sameValue, date, valueX)
> > > ObjectY (uniqueId, sameValue, valueY)
> > >
> > > I want to create a result object based on X and Y using sameValue as
> > "key"
> > > but there are some problems here:
> > >
> > >    - sameValue of ObjectX cannot be key because I must take care of
> date
> > >    - sameValue of ObjectY cannot be key because sameValue is not key of
> > >    ObjectX (we couldn't join anything)
> > >    - uniqueId of ObjectY cannot be key because does not exists in
> ObjectX
> > >    (we couldn't join anything)
> > >    - I couldn't use as key something like someValue_date because date
> > does
> > >    not exists in ObjectY (we couldn't join anything)
> > >
> > > So, actually I don't know how to implement this using Kafka Streams. I
> > need
> > > join data using a value field of each message (sameValue but not as
> key)
> > > and do it indefinetely because I don't know when data will be sent
> > whereas
> > > the process will always be creating new result objects.
> > >
> > > Basically, I want to use streaming with Kafka Stream to make joins
> > between
> > > two sources of data but we cannot use KTable (key problems) and we
> cannot
> > > use windowed KStream (or yes but with memory issues as you said)
> because
> > I
> > > don't know when data will arrive and I cannot lose data (any matching
> > > between both sources).
> > >
> > > Do you see any solution? Will I have to use Processor API instead of
> DSL
> > to
> > > spill data to disk as you said?
> > >
> > > Thanks in advance!
> > >
> > > 2016-09-05 20:00 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
> > >
> > >> Hey,
> > >>
> > >> are you sure, you want to join everything? This will result in a huge
> > >> memory footprint of your application. You are right, that you cannot
> use
> > >> KTable, however, windowed KStream joins would work -- you only need to
> > >> specify a huge window (ie, use Long.MAX_VALUE; this will effectively
> be
> > >> "infinitely large") thus that all data falls into a single window.
> > >>
> > >> The issue will be, that all data will be buffered in memory, thus, if
> > >> your application run very long, it will eventually fail (I would
> > >> assume). Thus, again my initial question: are you sure, you want to
> join
> > >> everything? (It's stream processing, not batch processing...)
> > >>
> > >> If the answer is still yes, and you hit a memory issue, you will need
> to
> > >> fall back to use Processor API instead of DSL to spill data to disk if
> > >> it does not fit into memory and more (ie, you will need to implement
> > >> your own version of an symmetric-hash-join that spills to disk). Of
> > >> course, the disk usage will also be huge. Eventually, your disc might
> > >> also become too small...
> > >>
> > >> Can you clarify, why you want to join everything? This does not sound
> > >> like a good idea. Very large windows are handleable, but "infinite"
> > >> windows are very problematic in stream processing.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote:
> > >>> Hi,
> > >>>
> > >>> I've been thinking how to solve with Kafka Streams one of my business
> > >>> process without success for the moment. Hope someone can help me.
> > >>>
> > >>> I am reading from two topics events like that (I'll simplify the
> > problem
> > >> at
> > >>> this point):
> > >>>
> > >>> ObjectX
> > >>> Key: String
> > >>> Value: String
> > >>>
> > >>> ObjectY
> > >>> Key: String
> > >>> Value: String
> > >>>
> > >>> I want to do some kind of "join" for all events without windowing but
> > >> also
> > >>> without being KTables...
> > >>>
> > >>> Example:
> > >>>
> > >>> ==============================
> > >>>
> > >>> ObjectX("0001", "a") -> TopicA
> > >>>
> > >>> Expected output TopicResult:
> > >>>
> > >>> nothing
> > >>>
> > >>> ==============================
> > >>>
> > >>> ObjectX("0001", "b") -> Topic A
> > >>>
> > >>> Expected output TopicResult:
> > >>>
> > >>> nothing
> > >>>
> > >>> ==============================
> > >>>
> > >>> ObjectY("0001", "d") -> Topic B:
> > >>>
> > >>> Expected output TopicResult:
> > >>>
> > >>> ObjectZ("0001", ("a", "d"))
> > >>> ObjectZ("0001", ("b", "d"))
> > >>>
> > >>> ==============================
> > >>>
> > >>> ==============================
> > >>>
> > >>> ObjectY("0001", "e") -> Topic B:
> > >>>
> > >>> Expected output TopicResult:
> > >>>
> > >>> ObjectZ("0001", ("a", "e"))
> > >>> ObjectZ("0001", ("b", "e"))
> > >>>
> > >>> ==============================
> > >>>
> > >>> TopicResult at the end:
> > >>>
> > >>> ObjectZ("0001", ("a", "d"))
> > >>> ObjectZ("0001", ("b", "d"))
> > >>> ObjectZ("0001", ("a", "e"))
> > >>> ObjectZ("0001", ("b", "e"))
> > >>>
> > >>> ==============================
> > >>>
> > >>> I think I can't use KTable-KTable join because I want to match all
> the
> > >>> events from the beginning of time. Hence, I can't use KStream-KStream
> > >> join
> > >>> because force me to use windowing. Same for KStream-KTable join...
> > >>>
> > >>> Any expert using Kafka Streams could help me with some tips?
> > >>>
> > >>> Thanks in advance.
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to