Hi All,

Any suggestions on this would really help.

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hi All,
>
> I looked into an earlier email about the topic broadcast config through
> connected stream and I couldn't find the conclusion.
>
> I can't do the below approach since I need the config to be published to
> all operator instances but I need keyed state for external querying.
>
> streamToBeConfigured.connect(configMessageStream)
> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> .flatMap(new FunctionWithConfigurableState())
> .addSink(...);
>
> One of the resolution I found in that mail chain was below. I can use this
> to solve my issue but is this still the recommended approach?
>
> stream1.connect(stream2)
>             .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
> ConfigMessage>
>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
>             .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
>             .addSink(...);
>
> Thanks,
> Navneeth
>

Reply via email to