Hello Sachin, 1) It seems from your source code, that in the stream2.transform you are generating a new value and return a new key-value pair:
mutate value = enrich(value, result) return new KeyValue(key, value); --------------- Anyways, if you do not want to generate a new value object, and just have a field function like this: value.enrich(result) return new KeyValue(key, value); --------------- That actually still works as long as your serde function recognize the optional enriched fields and can encode / decode the value object with / without the enriched fields. 2) And regarding your join scenario, if your join does not depend on any field, but only depending on the time-range (since you used fetchAll() which would return ALL the key-values falling into that range) I think this would be the way to do it. Guozhang On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I have two streams and I want to enrich stream2 records based off stream1 > records. > I really cannot join those two streams is because there is no common key > between them. > Hence only way I can do that is using a timestamp field property. > > This is how I have built my pipeline. > ..... > //create and add state store > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder = > Stores.windowStoreBuilder( > Stores.persistentWindowStore("enrich-message-store", > retentionSize, windowSize, false), ...); > builder.addStateStore(enrichStoreBuilder) > ..... > //push the data to state store > stream1.transform(new TransformerSupplier() { transform(K key, V value) { > // timestamp = extract timestamp from (key, value) > enrichMessageStore.put(key , value, timestamp); > return new KeyValue(key, value); > } } , " enrich-message-store"); > ..... > //fetch the data from state store for > stream2.transform(new TransformerSupplier() { transform(K key, V value) { > // (from, to) = extract (from, to) from (key, value) > result = enrichMessageStore.fetchAll( from, to ); > //mutate value = enrich(value, result) > return new KeyValue(key, value); > } } , " enrich-message-store"); > ....... > > So is this something that would work. Basically in one stream > transformation a state store is updated. > And in second stream transformation values from that state stores are > fetched and stream2 value is enriched from those values. > The enriched (mutated) value is returned back to downstream. > > So will this work and most important can we mutate the input value itself > in transform function and return the same or we should always create new > instances of the same. > > Is there any better way of doing this. > > Thanks > Sachin > -- -- Guozhang