Hi, I have two streams and I want to enrich stream2 records based off stream1 records. I really cannot join those two streams is because there is no common key between them. Hence only way I can do that is using a timestamp field property.
This is how I have built my pipeline. ..... //create and add state store final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder = Stores.windowStoreBuilder( Stores.persistentWindowStore("enrich-message-store", retentionSize, windowSize, false), ...); builder.addStateStore(enrichStoreBuilder) ..... //push the data to state store stream1.transform(new TransformerSupplier() { transform(K key, V value) { // timestamp = extract timestamp from (key, value) enrichMessageStore.put(key , value, timestamp); return new KeyValue(key, value); } } , " enrich-message-store"); ..... //fetch the data from state store for stream2.transform(new TransformerSupplier() { transform(K key, V value) { // (from, to) = extract (from, to) from (key, value) result = enrichMessageStore.fetchAll( from, to ); //mutate value = enrich(value, result) return new KeyValue(key, value); } } , " enrich-message-store"); ....... So is this something that would work. Basically in one stream transformation a state store is updated. And in second stream transformation values from that state stores are fetched and stream2 value is enriched from those values. The enriched (mutated) value is returned back to downstream. So will this work and most important can we mutate the input value itself in transform function and return the same or we should always create new instances of the same. Is there any better way of doing this. Thanks Sachin