Hello Sachin,

1) It seems from your source code, that in the stream2.transform you are
generating a new value and return a new key-value pair:

mutate value = enrich(value, result)
return new KeyValue(key, value);

---------------

Anyways, if you do not want to generate a new value object, and just have a
field function like this:

value.enrich(result)
return new KeyValue(key, value);

---------------

That actually still works as long as your serde function recognize the
optional enriched fields and can encode / decode the value object with /
without the enriched fields.

2) And regarding your join scenario, if your join does not depend on any
field, but only depending on the time-range (since you used fetchAll()
which would return ALL the key-values falling into that range) I think this
would be the way to do it.


Guozhang


On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I have two streams and I want to enrich stream2 records based off stream1
> records.
> I really cannot join those two streams is because there is no common key
> between them.
> Hence only way I can do that is using a timestamp field property.
>
> This is how I have built my pipeline.
> .....
> //create and add state store
> final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder =
> Stores.windowStoreBuilder(
>             Stores.persistentWindowStore("enrich-message-store",
> retentionSize, windowSize, false), ...);
> builder.addStateStore(enrichStoreBuilder)
> .....
> //push the data to state store
> stream1.transform(new TransformerSupplier() { transform(K key, V value) {
>                 // timestamp = extract timestamp from (key, value)
>                 enrichMessageStore.put(key , value, timestamp);
>                 return new KeyValue(key, value);
> } } , " enrich-message-store");
> .....
> //fetch the data from state store for
> stream2.transform(new TransformerSupplier() { transform(K key, V value) {
>                 // (from, to) = extract (from, to) from (key, value)
>                 result = enrichMessageStore.fetchAll( from, to );
>                 //mutate value = enrich(value, result)
>                 return new KeyValue(key, value);
> } } , " enrich-message-store");
> .......
>
> So is this something that would work. Basically in one stream
> transformation a state store is updated.
> And in second stream transformation values from that state stores are
> fetched and stream2 value is enriched from those values.
> The enriched (mutated) value is returned back to downstream.
>
> So will this work and most important can we mutate the input value itself
> in transform function and return the same or we should always create new
> instances of the same.
>
> Is there any better way of doing this.
>
> Thanks
> Sachin
>


-- 
-- Guozhang

Reply via email to