Hi Alessandro, For merging the three streams, have you considered the `KStream.merge` method? If the values are of different types, you'll need to map them into a common type first, but I think something like this will work:
KStream mappedOne = orignalStreamOne.mapValues(...); KStream mappedTwo = originalStreamTwo.mapValues(...): KStream mappedThree = originalStreamThree.mapValues(...); KStream mergedStream = mappedOne.merge(mappedTwo).merge(mappedThree); Just keep in mind there are no ordering guarantees for the records of the merged streams. Also, I made the assumption the keys are of the same type, if not, then you'll have to change the `mapValues` call to a `map`. HTH, Bill On Mon, May 11, 2020 at 11:02 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Hello everyone, > > we currently use 3 streams (metrics, events, states) and I need to > implement a keepalive mechanism so that if the machine doesn't send any > data (from a specific list of variables) it'll emit a value that changes > the machine state. > > For example, in machine 1 the list of keepalive variables are "foo" and > "bar", to propagate the list of variables I use a configuration topic that > uses a GlobalKTable so that each stream application can read the machine's > configuration. > > Then my idea was to "merge" all the three streams into one so that I can > use a ValueTransformer to: > - read the configuration store and ignore messages that don't belong to > the configured variables for a machine > - uses a local state store to save the "last seen" of a machine > - use the punctuator to emit a "change" in the machine status if the "last > seen" of a machine is older than some time > > To "merge' the 3 streams I was thinking to just map them into a single > intermediate topic and have the ValueTransformer read from that. > > Is there a better way? Maybe without using an intermediate topic? > > Thank you in advance > > -- > Alessandro Tagliapietra >