I think multi-way stream-stream join, beyond table-table join would be a
good to add. About joining with different keys, we have foreign-key join
for KTables only at the moment (KIP-213), and maybe we can follow that
direction as well.

Also in your case, if you can manage to transform your stream-stream join
to table-table join maybe the existing API would be more appealing to apply
:)


Guozhang

On Sun, Feb 23, 2020 at 8:29 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> KIP-150 is indeed cool, and I suppose it would be released as part of 2.5.
> I can see some use cases of the new api where one can avoid multiple
> aggregations.
>
> I believe in the same lines if we can introduce two more api's.
> 1. to join two streams having different keys. This would help in trying to
> avoid transforming the streams to matchable keys.
> It can be something like:
> <KA,VA>stream
>  . join(
>     KStream<KB,VB> otherStream,
>     (ka, kb) -> //return true/false (true means it can be joined)
>     (va, vb) -> ...
>   )
> So basically we introduce a key joiner which will compare two different
> keys and return true/value to indicate if their values can be joined or
> not.
> The resultant stream can be of type (KA, VA+VB)
> So here we have basically enriched streamA with values of streamB using
> simple join.
>
> 2. In lines of KIP-150 an api to join multiple streams something like:
> <K,VA>stream
>  . join(
>     KStream<K,VB> otherStream1,
>     KStream<K,VC> otherStream2,
>     ...
>     (va, vb, vc, ...) -> ...
>   )
> This way we can join multiple streams and whose resultant stream if of type
> (K, VA+VB+VC...)
>
> Let me know if something like this can be useful.
>
> Thanks
> Sachin
>
>
>
> On Sun, Feb 23, 2020 at 5:40 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > You can check first by topology.describe() if there are indeed some
> > repartition topics existing in your topology. If there's none, you may
> also
> > want to check if the children operators of the split is in the order of
> > (AA, AB, B): if the B sub-stream is the first child, then it may also
> > trigger fetch before the join.
> >
> > If that's the case, I'd suggest maybe you can consider some multi-way
> join
> > operations similar to "co-group" (KIP-150): I mean, you can implemented
> > something like that, not exactly that than relying on the existing
> in-built
> > operators, because what you've described here really sounds like a
> > three-way join to me except for streamB it is not joined by key but by
> time
> > window.
> >
> > Guozhang
> >
> >
> >
> > On Fri, Feb 21, 2020 at 8:31 PM Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > Hi,
> > > So in my example what I notice is that records order is:
> > > [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)]
> > > Stream is branched as:
> > > input
> > >     .branch(
> > >         (k, v) -> //filter records of type AA,
> > >         (k, v) -> //filter records of type AB,
> > >         (k, v) -> //filter records of type B,
> > >     )
> > > Topology is
> > > streamAA.join(streamAB).process(/*put joined value to state store*/)
> > > streamB.transformValues(/*fetch joined value from state store and
> enrich
> > > the value*/)
> > >
> > > Once I see the logs I see the following order:
> > > (KA, AA1)
> > > (KA, AB1)
> > > fetch for joined data
> > > (KB, B1)
> > > join [ (KA, AA1), (KA, AB1) ]
> > > put joined record
> > >
> > > Clearly it is trying to fetch data before it is made available to the
> > state
> > > store and that would be because join operation happens after the
> streamB
> > > record is read.
> > > As you mentioned that if there is re-partition then it may not hold.
> > > I wanted to know when does kafka automatically re-partition the topics.
> > > Just for simplicity in the example, I had omitted lot of steps like:
> > > After branching I actually re-key each stream using map, and then
> apply a
> > > transformer and again re-key them.
> > >
> > > If this creates re-partition of these streams and order may not hold,
> > then
> > > what may be a good way to ensure streamB records are enriched with the
> > > joined data of streamAA and streamAB.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Sat, Feb 22, 2020 at 2:33 AM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > From the description it seems there's no repartition topics
> throughout
> > > your
> > > > topology, in that case the ordering should be guaranteed. I.e. a
> record
> > > > from the input topic would only be processed after all previous
> records
> > > > from that same topics have been processed entirely; if there's
> > > repartition
> > > > topics in between though, this may not hold.
> > > >
> > > > Also a minor thing that if your streamAA.join(streamBB) results are
> > only
> > > > needed for populating the store, you can use `process(..)` instead of
> > > > `transform(..)` as it would not need a return value since it would be
> > the
> > > > end of this branch of the topology.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Feb 20, 2020 at 7:32 PM Sachin Mittal <sjmit...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > > I wanted to understand if in this particular case my solution would
> > > work:
> > > > > Say I have source records [timestamp, (K,V)] in input topic in
> > > following
> > > > > order:
> > > > > .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ...
> > > > >
> > > > > I create multiple streams out of input stream as:
> > > > > input
> > > > >     .branch(
> > > > >         (k, v) -> //filter records of type AA,
> > > > >         (k, v) -> //filter records of type AB,
> > > > >         (k, v) -> //filter records of type B,
> > > > >     )
> > > > > Then my topology is written in following way:
> > > > > //join stream of type AA with AB and push its value in state store
> > > > > streamAA.join(streamAB).transform( new TransformerSupplier() {
> > > > transform(K
> > > > > key, V value) {
> > > > >                 // timestamp = extract timestamp from (key, value)
> > > > >                 enrichMessageStore.put(key , value, timestamp);
> > > > >                 return new KeyValue(key, value);
> > > > > } } , "enrich-message-store")
> > > > > //fetch the data from that state store and enrich streamB
> > > > > streamB.transform(new TransformerSupplier() { transform(K key, V
> > > value) {
> > > > >                 // (from, to) = extract (from, to) from (key,
> value)
> > > > >                 result = enrichMessageStore.fetchAll( from, to );
> > > > >                 //mutate value using value.enrich(result)
> > > > >                 return new KeyValue(key, value);
> > > > > } } , "enrich-message-store");
> > > > >
> > > > > So does kafka stream ensure that records of streamB would be
> > processed
> > > > only
> > > > > after records of streamAA and streamAB are joined since they are in
> > > > order?
> > > > > Because if the operation of streamAA.join(streamAB) happens after
> > > > > streamB.transform()
> > > > > then it will not work.
> > > > > I am assuming that since streamAA and streamAB types of records are
> > > > before
> > > > > streamB type of record in the input topic the join will also happen
> > > > before.
> > > > > If this assumption is not safe then is there any other way of
> > ensuring.
> > > > >
> > > > > For now lets assume there is single partition of the input topic.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Sachin,
> > > > > >
> > > > > > 1) It seems from your source code, that in the stream2.transform
> > you
> > > > are
> > > > > > generating a new value and return a new key-value pair:
> > > > > >
> > > > > > mutate value = enrich(value, result)
> > > > > > return new KeyValue(key, value);
> > > > > >
> > > > > > ---------------
> > > > > >
> > > > > > Anyways, if you do not want to generate a new value object, and
> > just
> > > > > have a
> > > > > > field function like this:
> > > > > >
> > > > > > value.enrich(result)
> > > > > > return new KeyValue(key, value);
> > > > > >
> > > > > > ---------------
> > > > > >
> > > > > > That actually still works as long as your serde function
> recognize
> > > the
> > > > > > optional enriched fields and can encode / decode the value object
> > > with
> > > > /
> > > > > > without the enriched fields.
> > > > > >
> > > > > > 2) And regarding your join scenario, if your join does not depend
> > on
> > > > any
> > > > > > field, but only depending on the time-range (since you used
> > > fetchAll()
> > > > > > which would return ALL the key-values falling into that range) I
> > > think
> > > > > this
> > > > > > would be the way to do it.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <
> sjmit...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > I have two streams and I want to enrich stream2 records based
> off
> > > > > stream1
> > > > > > > records.
> > > > > > > I really cannot join those two streams is because there is no
> > > common
> > > > > key
> > > > > > > between them.
> > > > > > > Hence only way I can do that is using a timestamp field
> property.
> > > > > > >
> > > > > > > This is how I have built my pipeline.
> > > > > > > .....
> > > > > > > //create and add state store
> > > > > > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder =
> > > > > > > Stores.windowStoreBuilder(
> > > > > > >
>  Stores.persistentWindowStore("enrich-message-store",
> > > > > > > retentionSize, windowSize, false), ...);
> > > > > > > builder.addStateStore(enrichStoreBuilder)
> > > > > > > .....
> > > > > > > //push the data to state store
> > > > > > > stream1.transform(new TransformerSupplier() { transform(K key,
> V
> > > > > value) {
> > > > > > >                 // timestamp = extract timestamp from (key,
> > value)
> > > > > > >                 enrichMessageStore.put(key , value, timestamp);
> > > > > > >                 return new KeyValue(key, value);
> > > > > > > } } , " enrich-message-store");
> > > > > > > .....
> > > > > > > //fetch the data from state store for
> > > > > > > stream2.transform(new TransformerSupplier() { transform(K key,
> V
> > > > > value) {
> > > > > > >                 // (from, to) = extract (from, to) from (key,
> > > value)
> > > > > > >                 result = enrichMessageStore.fetchAll( from, to
> );
> > > > > > >                 //mutate value = enrich(value, result)
> > > > > > >                 return new KeyValue(key, value);
> > > > > > > } } , " enrich-message-store");
> > > > > > > .......
> > > > > > >
> > > > > > > So is this something that would work. Basically in one stream
> > > > > > > transformation a state store is updated.
> > > > > > > And in second stream transformation values from that state
> stores
> > > are
> > > > > > > fetched and stream2 value is enriched from those values.
> > > > > > > The enriched (mutated) value is returned back to downstream.
> > > > > > >
> > > > > > > So will this work and most important can we mutate the input
> > value
> > > > > itself
> > > > > > > in transform function and return the same or we should always
> > > create
> > > > > new
> > > > > > > instances of the same.
> > > > > > >
> > > > > > > Is there any better way of doing this.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to