Hi Alessandro,

For merging the three streams, have you considered the `KStream.merge`
method?
If the values are of different types, you'll need to map them into a common
type first, but I think
something like this will work:

KStream mappedOne = orignalStreamOne.mapValues(...);
KStream mappedTwo = originalStreamTwo.mapValues(...):
KStream mappedThree = originalStreamThree.mapValues(...);

KStream mergedStream = mappedOne.merge(mappedTwo).merge(mappedThree);

Just keep in mind there are no ordering guarantees for the records of the
merged streams.  Also, I made the assumption the keys are
of the same type, if not, then you'll have to change the `mapValues` call
to a `map`.

HTH,
Bill

On Mon, May 11, 2020 at 11:02 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hello everyone,
>
> we currently use 3 streams (metrics, events, states) and I need to
> implement a keepalive mechanism so that if the machine doesn't send any
> data (from a specific list of variables) it'll emit a value that changes
> the machine state.
>
> For example, in machine 1 the list of keepalive variables are "foo" and
> "bar", to propagate the list of variables I use a configuration topic that
> uses a GlobalKTable so that each stream application can read the machine's
> configuration.
>
> Then my idea was to "merge" all the three streams into one so that I can
> use a ValueTransformer to:
>  - read the configuration store and ignore messages that don't belong to
> the configured variables for a machine
>  - uses a local state store to save the "last seen" of a machine
>  - use the punctuator to emit a "change" in the machine status if the "last
> seen" of a machine is older than some time
>
> To "merge' the 3 streams I was thinking to just map them into a single
> intermediate topic and have the ValueTransformer read from that.
>
> Is there a better way? Maybe without using an intermediate topic?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>

Reply via email to