Hi Aljoscha,

Thanks a lot for your help! Yes solution 2 did point me the right
direction. Ended up connect them before window function so I can filter out
uninterested element earlier.

thanks,
Lei

On Tue, Jul 4, 2017 at 9:33 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Lei,
>
> I’m afraid there is currently no API for doing this in one operation. I
> see two options right now:
>
> 1. Built a custom operator that implements windowing and also has a second
> input for the parameter stream. This would be a subclass of
> TwoInputStreamOperator. As an example, you can look at
> KeyedCoProcessOperator which is the operator implementation for a two-input
> process function (CoProcessFunction). This variant gives you most
> flexibility but it’s a bit involved.
>
> 2. Use two separate steps, i.e. first do the windowed operation and then
> have a second operation that combines the window results with the parameter
> stream. Something like this:
>
> DataStream<T> input = …;
> DataStream<P> parameterStream = …;
> input
>   .keyBy(…)
>   .window(…)
>   .reduce()/process()/apply() // the operation that you want to perform
>   .connect(parameterStream)
>   .process(new MyCoProcessFunction())
>
> Where MyCoProcessFunction would receive the results of the windowed
> operation on input 1 and the parameter stream on input 2. The function
> would keep state based on the parameter stream (you should checkpoint this
> (see CheckpointedFunction, and especially 
> OperatorStateStore.getUnionListState())
> and process elements that come in on input 1 based on this state.
>
> Union ListState works like this: each parallel operator instance can put a
> bunch of things in state. When checkpointing, the state of all parallel
> instances is collected and checkpointed. When restoring (after failure, for
> example) all state is sent to each parallel operator instance. In your case
> (I’m assuming that the parameter stream should be broadcast so that all
> parallel operator instances get the same input and therefore have the same
> state) you would only checkpoint the state of parallel operator instance 0.
> When restoring, this would be distributed to all operators and they
> therefore all have the same state again.
>
> Does that help?
>
> Best,
> Aljoscha
>
> > On 30. Jun 2017, at 21:22, Lei Chen <ley...@gmail.com> wrote:
> >
> > Hi,
> >
> > In my scenario I have 2 streams. DS1 is main data stream reading logs
> from
> > kafka, and DS2 is a parameter stream which is used to maintain a state
> > about all processing parameters (including filters) need to be applied at
> > runtime by DS1. The processing parameters can be changed anytime during
> the
> > job is running.
> >
> > DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to
> > connect these 2 streams together so DS1 can apply those parameters in its
> > window function by reading up-to-date parameter state maintained by DS2?
> >
> >
> > thanks
> > Lei
>
>

Reply via email to