Hi,
KIP-150 is indeed cool, and I suppose it would be released as part of 2.5.
I can see some use cases of the new api where one can avoid multiple
aggregations.

I believe in the same lines if we can introduce two more api's.
1. to join two streams having different keys. This would help in trying to
avoid transforming the streams to matchable keys.
It can be something like:
<KA,VA>stream
 . join(
    KStream<KB,VB> otherStream,
    (ka, kb) -> //return true/false (true means it can be joined)
    (va, vb) -> ...
  )
So basically we introduce a key joiner which will compare two different
keys and return true/value to indicate if their values can be joined or not.
The resultant stream can be of type (KA, VA+VB)
So here we have basically enriched streamA with values of streamB using
simple join.

2. In lines of KIP-150 an api to join multiple streams something like:
<K,VA>stream
 . join(
    KStream<K,VB> otherStream1,
    KStream<K,VC> otherStream2,
    ...
    (va, vb, vc, ...) -> ...
  )
This way we can join multiple streams and whose resultant stream if of type
(K, VA+VB+VC...)

Let me know if something like this can be useful.

Thanks
Sachin



On Sun, Feb 23, 2020 at 5:40 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Sachin,
>
> You can check first by topology.describe() if there are indeed some
> repartition topics existing in your topology. If there's none, you may also
> want to check if the children operators of the split is in the order of
> (AA, AB, B): if the B sub-stream is the first child, then it may also
> trigger fetch before the join.
>
> If that's the case, I'd suggest maybe you can consider some multi-way join
> operations similar to "co-group" (KIP-150): I mean, you can implemented
> something like that, not exactly that than relying on the existing in-built
> operators, because what you've described here really sounds like a
> three-way join to me except for streamB it is not joined by key but by time
> window.
>
> Guozhang
>
>
>
> On Fri, Feb 21, 2020 at 8:31 PM Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > So in my example what I notice is that records order is:
> > [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)]
> > Stream is branched as:
> > input
> >     .branch(
> >         (k, v) -> //filter records of type AA,
> >         (k, v) -> //filter records of type AB,
> >         (k, v) -> //filter records of type B,
> >     )
> > Topology is
> > streamAA.join(streamAB).process(/*put joined value to state store*/)
> > streamB.transformValues(/*fetch joined value from state store and enrich
> > the value*/)
> >
> > Once I see the logs I see the following order:
> > (KA, AA1)
> > (KA, AB1)
> > fetch for joined data
> > (KB, B1)
> > join [ (KA, AA1), (KA, AB1) ]
> > put joined record
> >
> > Clearly it is trying to fetch data before it is made available to the
> state
> > store and that would be because join operation happens after the streamB
> > record is read.
> > As you mentioned that if there is re-partition then it may not hold.
> > I wanted to know when does kafka automatically re-partition the topics.
> > Just for simplicity in the example, I had omitted lot of steps like:
> > After branching I actually re-key each stream using map, and then apply a
> > transformer and again re-key them.
> >
> > If this creates re-partition of these streams and order may not hold,
> then
> > what may be a good way to ensure streamB records are enriched with the
> > joined data of streamAA and streamAB.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Sat, Feb 22, 2020 at 2:33 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > From the description it seems there's no repartition topics throughout
> > your
> > > topology, in that case the ordering should be guaranteed. I.e. a record
> > > from the input topic would only be processed after all previous records
> > > from that same topics have been processed entirely; if there's
> > repartition
> > > topics in between though, this may not hold.
> > >
> > > Also a minor thing that if your streamAA.join(streamBB) results are
> only
> > > needed for populating the store, you can use `process(..)` instead of
> > > `transform(..)` as it would not need a return value since it would be
> the
> > > end of this branch of the topology.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Feb 20, 2020 at 7:32 PM Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > I wanted to understand if in this particular case my solution would
> > work:
> > > > Say I have source records [timestamp, (K,V)] in input topic in
> > following
> > > > order:
> > > > .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ...
> > > >
> > > > I create multiple streams out of input stream as:
> > > > input
> > > >     .branch(
> > > >         (k, v) -> //filter records of type AA,
> > > >         (k, v) -> //filter records of type AB,
> > > >         (k, v) -> //filter records of type B,
> > > >     )
> > > > Then my topology is written in following way:
> > > > //join stream of type AA with AB and push its value in state store
> > > > streamAA.join(streamAB).transform( new TransformerSupplier() {
> > > transform(K
> > > > key, V value) {
> > > >                 // timestamp = extract timestamp from (key, value)
> > > >                 enrichMessageStore.put(key , value, timestamp);
> > > >                 return new KeyValue(key, value);
> > > > } } , "enrich-message-store")
> > > > //fetch the data from that state store and enrich streamB
> > > > streamB.transform(new TransformerSupplier() { transform(K key, V
> > value) {
> > > >                 // (from, to) = extract (from, to) from (key, value)
> > > >                 result = enrichMessageStore.fetchAll( from, to );
> > > >                 //mutate value using value.enrich(result)
> > > >                 return new KeyValue(key, value);
> > > > } } , "enrich-message-store");
> > > >
> > > > So does kafka stream ensure that records of streamB would be
> processed
> > > only
> > > > after records of streamAA and streamAB are joined since they are in
> > > order?
> > > > Because if the operation of streamAA.join(streamAB) happens after
> > > > streamB.transform()
> > > > then it will not work.
> > > > I am assuming that since streamAA and streamAB types of records are
> > > before
> > > > streamB type of record in the input topic the join will also happen
> > > before.
> > > > If this assumption is not safe then is there any other way of
> ensuring.
> > > >
> > > > For now lets assume there is single partition of the input topic.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Sachin,
> > > > >
> > > > > 1) It seems from your source code, that in the stream2.transform
> you
> > > are
> > > > > generating a new value and return a new key-value pair:
> > > > >
> > > > > mutate value = enrich(value, result)
> > > > > return new KeyValue(key, value);
> > > > >
> > > > > ---------------
> > > > >
> > > > > Anyways, if you do not want to generate a new value object, and
> just
> > > > have a
> > > > > field function like this:
> > > > >
> > > > > value.enrich(result)
> > > > > return new KeyValue(key, value);
> > > > >
> > > > > ---------------
> > > > >
> > > > > That actually still works as long as your serde function recognize
> > the
> > > > > optional enriched fields and can encode / decode the value object
> > with
> > > /
> > > > > without the enriched fields.
> > > > >
> > > > > 2) And regarding your join scenario, if your join does not depend
> on
> > > any
> > > > > field, but only depending on the time-range (since you used
> > fetchAll()
> > > > > which would return ALL the key-values falling into that range) I
> > think
> > > > this
> > > > > would be the way to do it.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > I have two streams and I want to enrich stream2 records based off
> > > > stream1
> > > > > > records.
> > > > > > I really cannot join those two streams is because there is no
> > common
> > > > key
> > > > > > between them.
> > > > > > Hence only way I can do that is using a timestamp field property.
> > > > > >
> > > > > > This is how I have built my pipeline.
> > > > > > .....
> > > > > > //create and add state store
> > > > > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder =
> > > > > > Stores.windowStoreBuilder(
> > > > > >             Stores.persistentWindowStore("enrich-message-store",
> > > > > > retentionSize, windowSize, false), ...);
> > > > > > builder.addStateStore(enrichStoreBuilder)
> > > > > > .....
> > > > > > //push the data to state store
> > > > > > stream1.transform(new TransformerSupplier() { transform(K key, V
> > > > value) {
> > > > > >                 // timestamp = extract timestamp from (key,
> value)
> > > > > >                 enrichMessageStore.put(key , value, timestamp);
> > > > > >                 return new KeyValue(key, value);
> > > > > > } } , " enrich-message-store");
> > > > > > .....
> > > > > > //fetch the data from state store for
> > > > > > stream2.transform(new TransformerSupplier() { transform(K key, V
> > > > value) {
> > > > > >                 // (from, to) = extract (from, to) from (key,
> > value)
> > > > > >                 result = enrichMessageStore.fetchAll( from, to );
> > > > > >                 //mutate value = enrich(value, result)
> > > > > >                 return new KeyValue(key, value);
> > > > > > } } , " enrich-message-store");
> > > > > > .......
> > > > > >
> > > > > > So is this something that would work. Basically in one stream
> > > > > > transformation a state store is updated.
> > > > > > And in second stream transformation values from that state stores
> > are
> > > > > > fetched and stream2 value is enriched from those values.
> > > > > > The enriched (mutated) value is returned back to downstream.
> > > > > >
> > > > > > So will this work and most important can we mutate the input
> value
> > > > itself
> > > > > > in transform function and return the same or we should always
> > create
> > > > new
> > > > > > instances of the same.
> > > > > >
> > > > > > Is there any better way of doing this.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to