Hello Aljoscha, Indeed, it seems like I'd need a custom operator. I imagine this involves implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could you provide those pointers please?
Alex On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I’m afraid there is currently now way to do what you want with the builtin > window primitives. Each of the slices of the sliding windows is essentially > evaluated independently. Therefore, there cannot be effects in one slice > that influence processing of another slice. > > What you could do is switch to tumbling windows, then each element would > only be in one window. That probably won’t fit your use case anymore. The > alternative I see to that is to implement everything in a custom operator > where you deal with window states and triggering on time yourself. Let me > know if you need some pointers about that one. > > Cheers, > Aljoscha > > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> > wrote: > > > > Hello, > > > > I'm trying to implement a left outer join of two Kafka streams within a > sliding window. So far I have the following code: > > > > foos > > .coGroup(bars) > > .where(_.baz).equalTo(_.baz) > > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, > TimeUnit.SECONDS))) > > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => > > fs.foreach(f => > > if (bs.isEmpty) > > o.collect(FooBar(f, None)) > > else > > bs.foreach(b => o.collect(FooBar(f, Some(b)))) > > ) > > ) > > > > However, this results in the pair being emitted from every window slide, > regardless of the match. The desired behaviour would be: > > * emit the the match as soon as it's found, don't emit any more pairs > for it, > > * otherwise, emit the empty match, when the left side element leaves the > last of its windows > > > > What would be the idiomatic/efficient way to implement such behaviour? > Is it possible at all with the coGroup/window mechanism, or some other way > is necessary? > > > > Alex > >