>From the description it seems there's no repartition topics throughout your
topology, in that case the ordering should be guaranteed. I.e. a record
from the input topic would only be processed after all previous records
from that same topics have been processed entirely; if there's repartition
topics in between though, this may not hold.

Also a minor thing that if your streamAA.join(streamBB) results are only
needed for populating the store, you can use `process(..)` instead of
`transform(..)` as it would not need a return value since it would be the
end of this branch of the topology.


Guozhang

On Thu, Feb 20, 2020 at 7:32 PM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I wanted to understand if in this particular case my solution would work:
> Say I have source records [timestamp, (K,V)] in input topic in following
> order:
> .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ...
>
> I create multiple streams out of input stream as:
> input
>     .branch(
>         (k, v) -> //filter records of type AA,
>         (k, v) -> //filter records of type AB,
>         (k, v) -> //filter records of type B,
>     )
> Then my topology is written in following way:
> //join stream of type AA with AB and push its value in state store
> streamAA.join(streamAB).transform( new TransformerSupplier() { transform(K
> key, V value) {
>                 // timestamp = extract timestamp from (key, value)
>                 enrichMessageStore.put(key , value, timestamp);
>                 return new KeyValue(key, value);
> } } , "enrich-message-store")
> //fetch the data from that state store and enrich streamB
> streamB.transform(new TransformerSupplier() { transform(K key, V value) {
>                 // (from, to) = extract (from, to) from (key, value)
>                 result = enrichMessageStore.fetchAll( from, to );
>                 //mutate value using value.enrich(result)
>                 return new KeyValue(key, value);
> } } , "enrich-message-store");
>
> So does kafka stream ensure that records of streamB would be processed only
> after records of streamAA and streamAB are joined since they are in order?
> Because if the operation of streamAA.join(streamAB) happens after
> streamB.transform()
> then it will not work.
> I am assuming that since streamAA and streamAB types of records are before
> streamB type of record in the input topic the join will also happen before.
> If this assumption is not safe then is there any other way of ensuring.
>
> For now lets assume there is single partition of the input topic.
>
> Thanks
> Sachin
>
>
>
>
> On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Sachin,
> >
> > 1) It seems from your source code, that in the stream2.transform you are
> > generating a new value and return a new key-value pair:
> >
> > mutate value = enrich(value, result)
> > return new KeyValue(key, value);
> >
> > ---------------
> >
> > Anyways, if you do not want to generate a new value object, and just
> have a
> > field function like this:
> >
> > value.enrich(result)
> > return new KeyValue(key, value);
> >
> > ---------------
> >
> > That actually still works as long as your serde function recognize the
> > optional enriched fields and can encode / decode the value object with /
> > without the enriched fields.
> >
> > 2) And regarding your join scenario, if your join does not depend on any
> > field, but only depending on the time-range (since you used fetchAll()
> > which would return ALL the key-values falling into that range) I think
> this
> > would be the way to do it.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > Hi,
> > > I have two streams and I want to enrich stream2 records based off
> stream1
> > > records.
> > > I really cannot join those two streams is because there is no common
> key
> > > between them.
> > > Hence only way I can do that is using a timestamp field property.
> > >
> > > This is how I have built my pipeline.
> > > .....
> > > //create and add state store
> > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder =
> > > Stores.windowStoreBuilder(
> > >             Stores.persistentWindowStore("enrich-message-store",
> > > retentionSize, windowSize, false), ...);
> > > builder.addStateStore(enrichStoreBuilder)
> > > .....
> > > //push the data to state store
> > > stream1.transform(new TransformerSupplier() { transform(K key, V
> value) {
> > >                 // timestamp = extract timestamp from (key, value)
> > >                 enrichMessageStore.put(key , value, timestamp);
> > >                 return new KeyValue(key, value);
> > > } } , " enrich-message-store");
> > > .....
> > > //fetch the data from state store for
> > > stream2.transform(new TransformerSupplier() { transform(K key, V
> value) {
> > >                 // (from, to) = extract (from, to) from (key, value)
> > >                 result = enrichMessageStore.fetchAll( from, to );
> > >                 //mutate value = enrich(value, result)
> > >                 return new KeyValue(key, value);
> > > } } , " enrich-message-store");
> > > .......
> > >
> > > So is this something that would work. Basically in one stream
> > > transformation a state store is updated.
> > > And in second stream transformation values from that state stores are
> > > fetched and stream2 value is enriched from those values.
> > > The enriched (mutated) value is returned back to downstream.
> > >
> > > So will this work and most important can we mutate the input value
> itself
> > > in transform function and return the same or we should always create
> new
> > > instances of the same.
> > >
> > > Is there any better way of doing this.
> > >
> > > Thanks
> > > Sachin
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to