Hi Aljoscha, Thanks a lot for your help! Yes solution 2 did point me the right direction. Ended up connect them before window function so I can filter out uninterested element earlier.
thanks, Lei On Tue, Jul 4, 2017 at 9:33 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Lei, > > I’m afraid there is currently no API for doing this in one operation. I > see two options right now: > > 1. Built a custom operator that implements windowing and also has a second > input for the parameter stream. This would be a subclass of > TwoInputStreamOperator. As an example, you can look at > KeyedCoProcessOperator which is the operator implementation for a two-input > process function (CoProcessFunction). This variant gives you most > flexibility but it’s a bit involved. > > 2. Use two separate steps, i.e. first do the windowed operation and then > have a second operation that combines the window results with the parameter > stream. Something like this: > > DataStream<T> input = …; > DataStream<P> parameterStream = …; > input > .keyBy(…) > .window(…) > .reduce()/process()/apply() // the operation that you want to perform > .connect(parameterStream) > .process(new MyCoProcessFunction()) > > Where MyCoProcessFunction would receive the results of the windowed > operation on input 1 and the parameter stream on input 2. The function > would keep state based on the parameter stream (you should checkpoint this > (see CheckpointedFunction, and especially > OperatorStateStore.getUnionListState()) > and process elements that come in on input 1 based on this state. > > Union ListState works like this: each parallel operator instance can put a > bunch of things in state. When checkpointing, the state of all parallel > instances is collected and checkpointed. When restoring (after failure, for > example) all state is sent to each parallel operator instance. In your case > (I’m assuming that the parameter stream should be broadcast so that all > parallel operator instances get the same input and therefore have the same > state) you would only checkpoint the state of parallel operator instance 0. > When restoring, this would be distributed to all operators and they > therefore all have the same state again. > > Does that help? > > Best, > Aljoscha > > > On 30. Jun 2017, at 21:22, Lei Chen <ley...@gmail.com> wrote: > > > > Hi, > > > > In my scenario I have 2 streams. DS1 is main data stream reading logs > from > > kafka, and DS2 is a parameter stream which is used to maintain a state > > about all processing parameters (including filters) need to be applied at > > runtime by DS1. The processing parameters can be changed anytime during > the > > job is running. > > > > DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to > > connect these 2 streams together so DS1 can apply those parameters in its > > window function by reading up-to-date parameter state maintained by DS2? > > > > > > thanks > > Lei > >