>From the description it seems there's no repartition topics throughout your topology, in that case the ordering should be guaranteed. I.e. a record from the input topic would only be processed after all previous records from that same topics have been processed entirely; if there's repartition topics in between though, this may not hold.
Also a minor thing that if your streamAA.join(streamBB) results are only needed for populating the store, you can use `process(..)` instead of `transform(..)` as it would not need a return value since it would be the end of this branch of the topology. Guozhang On Thu, Feb 20, 2020 at 7:32 PM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I wanted to understand if in this particular case my solution would work: > Say I have source records [timestamp, (K,V)] in input topic in following > order: > .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ... > > I create multiple streams out of input stream as: > input > .branch( > (k, v) -> //filter records of type AA, > (k, v) -> //filter records of type AB, > (k, v) -> //filter records of type B, > ) > Then my topology is written in following way: > //join stream of type AA with AB and push its value in state store > streamAA.join(streamAB).transform( new TransformerSupplier() { transform(K > key, V value) { > // timestamp = extract timestamp from (key, value) > enrichMessageStore.put(key , value, timestamp); > return new KeyValue(key, value); > } } , "enrich-message-store") > //fetch the data from that state store and enrich streamB > streamB.transform(new TransformerSupplier() { transform(K key, V value) { > // (from, to) = extract (from, to) from (key, value) > result = enrichMessageStore.fetchAll( from, to ); > //mutate value using value.enrich(result) > return new KeyValue(key, value); > } } , "enrich-message-store"); > > So does kafka stream ensure that records of streamB would be processed only > after records of streamAA and streamAB are joined since they are in order? > Because if the operation of streamAA.join(streamAB) happens after > streamB.transform() > then it will not work. > I am assuming that since streamAA and streamAB types of records are before > streamB type of record in the input topic the join will also happen before. > If this assumption is not safe then is there any other way of ensuring. > > For now lets assume there is single partition of the input topic. > > Thanks > Sachin > > > > > On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Sachin, > > > > 1) It seems from your source code, that in the stream2.transform you are > > generating a new value and return a new key-value pair: > > > > mutate value = enrich(value, result) > > return new KeyValue(key, value); > > > > --------------- > > > > Anyways, if you do not want to generate a new value object, and just > have a > > field function like this: > > > > value.enrich(result) > > return new KeyValue(key, value); > > > > --------------- > > > > That actually still works as long as your serde function recognize the > > optional enriched fields and can encode / decode the value object with / > > without the enriched fields. > > > > 2) And regarding your join scenario, if your join does not depend on any > > field, but only depending on the time-range (since you used fetchAll() > > which would return ALL the key-values falling into that range) I think > this > > would be the way to do it. > > > > > > Guozhang > > > > > > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > Hi, > > > I have two streams and I want to enrich stream2 records based off > stream1 > > > records. > > > I really cannot join those two streams is because there is no common > key > > > between them. > > > Hence only way I can do that is using a timestamp field property. > > > > > > This is how I have built my pipeline. > > > ..... > > > //create and add state store > > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder = > > > Stores.windowStoreBuilder( > > > Stores.persistentWindowStore("enrich-message-store", > > > retentionSize, windowSize, false), ...); > > > builder.addStateStore(enrichStoreBuilder) > > > ..... > > > //push the data to state store > > > stream1.transform(new TransformerSupplier() { transform(K key, V > value) { > > > // timestamp = extract timestamp from (key, value) > > > enrichMessageStore.put(key , value, timestamp); > > > return new KeyValue(key, value); > > > } } , " enrich-message-store"); > > > ..... > > > //fetch the data from state store for > > > stream2.transform(new TransformerSupplier() { transform(K key, V > value) { > > > // (from, to) = extract (from, to) from (key, value) > > > result = enrichMessageStore.fetchAll( from, to ); > > > //mutate value = enrich(value, result) > > > return new KeyValue(key, value); > > > } } , " enrich-message-store"); > > > ....... > > > > > > So is this something that would work. Basically in one stream > > > transformation a state store is updated. > > > And in second stream transformation values from that state stores are > > > fetched and stream2 value is enriched from those values. > > > The enriched (mutated) value is returned back to downstream. > > > > > > So will this work and most important can we mutate the input value > itself > > > in transform function and return the same or we should always create > new > > > instances of the same. > > > > > > Is there any better way of doing this. > > > > > > Thanks > > > Sachin > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang