Hi,
I have two streams and I want to enrich stream2 records based off stream1
records.
I really cannot join those two streams is because there is no common key
between them.
Hence only way I can do that is using a timestamp field property.

This is how I have built my pipeline.
.....
//create and add state store
final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder =
Stores.windowStoreBuilder(
            Stores.persistentWindowStore("enrich-message-store",
retentionSize, windowSize, false), ...);
builder.addStateStore(enrichStoreBuilder)
.....
//push the data to state store
stream1.transform(new TransformerSupplier() { transform(K key, V value) {
                // timestamp = extract timestamp from (key, value)
                enrichMessageStore.put(key , value, timestamp);
                return new KeyValue(key, value);
} } , " enrich-message-store");
.....
//fetch the data from state store for
stream2.transform(new TransformerSupplier() { transform(K key, V value) {
                // (from, to) = extract (from, to) from (key, value)
                result = enrichMessageStore.fetchAll( from, to );
                //mutate value = enrich(value, result)
                return new KeyValue(key, value);
} } , " enrich-message-store");
.......

So is this something that would work. Basically in one stream
transformation a state store is updated.
And in second stream transformation values from that state stores are
fetched and stream2 value is enriched from those values.
The enriched (mutated) value is returned back to downstream.

So will this work and most important can we mutate the input value itself
in transform function and return the same or we should always create new
instances of the same.

Is there any better way of doing this.

Thanks
Sachin

Reply via email to