Hi, I wanted to understand if in this particular case my solution would work: Say I have source records [timestamp, (K,V)] in input topic in following order: .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ...
I create multiple streams out of input stream as: input .branch( (k, v) -> //filter records of type AA, (k, v) -> //filter records of type AB, (k, v) -> //filter records of type B, ) Then my topology is written in following way: //join stream of type AA with AB and push its value in state store streamAA.join(streamAB).transform( new TransformerSupplier() { transform(K key, V value) { // timestamp = extract timestamp from (key, value) enrichMessageStore.put(key , value, timestamp); return new KeyValue(key, value); } } , "enrich-message-store") //fetch the data from that state store and enrich streamB streamB.transform(new TransformerSupplier() { transform(K key, V value) { // (from, to) = extract (from, to) from (key, value) result = enrichMessageStore.fetchAll( from, to ); //mutate value using value.enrich(result) return new KeyValue(key, value); } } , "enrich-message-store"); So does kafka stream ensure that records of streamB would be processed only after records of streamAA and streamAB are joined since they are in order? Because if the operation of streamAA.join(streamAB) happens after streamB.transform() then it will not work. I am assuming that since streamAA and streamAB types of records are before streamB type of record in the input topic the join will also happen before. If this assumption is not safe then is there any other way of ensuring. For now lets assume there is single partition of the input topic. Thanks Sachin On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Sachin, > > 1) It seems from your source code, that in the stream2.transform you are > generating a new value and return a new key-value pair: > > mutate value = enrich(value, result) > return new KeyValue(key, value); > > --------------- > > Anyways, if you do not want to generate a new value object, and just have a > field function like this: > > value.enrich(result) > return new KeyValue(key, value); > > --------------- > > That actually still works as long as your serde function recognize the > optional enriched fields and can encode / decode the value object with / > without the enriched fields. > > 2) And regarding your join scenario, if your join does not depend on any > field, but only depending on the time-range (since you used fetchAll() > which would return ALL the key-values falling into that range) I think this > would be the way to do it. > > > Guozhang > > > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi, > > I have two streams and I want to enrich stream2 records based off stream1 > > records. > > I really cannot join those two streams is because there is no common key > > between them. > > Hence only way I can do that is using a timestamp field property. > > > > This is how I have built my pipeline. > > ..... > > //create and add state store > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder = > > Stores.windowStoreBuilder( > > Stores.persistentWindowStore("enrich-message-store", > > retentionSize, windowSize, false), ...); > > builder.addStateStore(enrichStoreBuilder) > > ..... > > //push the data to state store > > stream1.transform(new TransformerSupplier() { transform(K key, V value) { > > // timestamp = extract timestamp from (key, value) > > enrichMessageStore.put(key , value, timestamp); > > return new KeyValue(key, value); > > } } , " enrich-message-store"); > > ..... > > //fetch the data from state store for > > stream2.transform(new TransformerSupplier() { transform(K key, V value) { > > // (from, to) = extract (from, to) from (key, value) > > result = enrichMessageStore.fetchAll( from, to ); > > //mutate value = enrich(value, result) > > return new KeyValue(key, value); > > } } , " enrich-message-store"); > > ....... > > > > So is this something that would work. Basically in one stream > > transformation a state store is updated. > > And in second stream transformation values from that state stores are > > fetched and stream2 value is enriched from those values. > > The enriched (mutated) value is returned back to downstream. > > > > So will this work and most important can we mutate the input value itself > > in transform function and return the same or we should always create new > > instances of the same. > > > > Is there any better way of doing this. > > > > Thanks > > Sachin > > > > > -- > -- Guozhang >