Hi  Bill,

thank you for replying.

Yes keys are all the same type (machine ID string)

Btw, your solution sounds great, but it'll only work if al the 3 streams
have the same number of partitions, right?
Otherwise there's no guarantee that all the data of the same machine (the
topic keys are the machine IDs) ends up in the same streams instance.
Which is instead guaranteed with the intermediate topic?

Thanks!

--
Alessandro Tagliapietra


On Tue, May 12, 2020 at 7:16 AM Bill Bejeck <b...@confluent.io> wrote:

> Hi Alessandro,
>
> For merging the three streams, have you considered the `KStream.merge`
> method?
> If the values are of different types, you'll need to map them into a common
> type first, but I think
> something like this will work:
>
> KStream mappedOne = orignalStreamOne.mapValues(...);
> KStream mappedTwo = originalStreamTwo.mapValues(...):
> KStream mappedThree = originalStreamThree.mapValues(...);
>
> KStream mergedStream = mappedOne.merge(mappedTwo).merge(mappedThree);
>
> Just keep in mind there are no ordering guarantees for the records of the
> merged streams.  Also, I made the assumption the keys are
> of the same type, if not, then you'll have to change the `mapValues` call
> to a `map`.
>
> HTH,
> Bill
>
> On Mon, May 11, 2020 at 11:02 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > we currently use 3 streams (metrics, events, states) and I need to
> > implement a keepalive mechanism so that if the machine doesn't send any
> > data (from a specific list of variables) it'll emit a value that changes
> > the machine state.
> >
> > For example, in machine 1 the list of keepalive variables are "foo" and
> > "bar", to propagate the list of variables I use a configuration topic
> that
> > uses a GlobalKTable so that each stream application can read the
> machine's
> > configuration.
> >
> > Then my idea was to "merge" all the three streams into one so that I can
> > use a ValueTransformer to:
> >  - read the configuration store and ignore messages that don't belong to
> > the configured variables for a machine
> >  - uses a local state store to save the "last seen" of a machine
> >  - use the punctuator to emit a "change" in the machine status if the
> "last
> > seen" of a machine is older than some time
> >
> > To "merge' the 3 streams I was thinking to just map them into a single
> > intermediate topic and have the ValueTransformer read from that.
> >
> > Is there a better way? Maybe without using an intermediate topic?
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
>

Reply via email to