Hi! I think this pull request may be implementing what you are looking for: https://github.com/apache/flink/pull/1527
Stephan On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryz...@gmail.com> wrote: > Hello Aljoscha, > > Indeed, it seems like I'd need a custom operator. I imagine this involves > implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? > Could > you provide those pointers please? > > Alex > > On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> I’m afraid there is currently now way to do what you want with the >> builtin window primitives. Each of the slices of the sliding windows is >> essentially evaluated independently. Therefore, there cannot be effects in >> one slice that influence processing of another slice. >> >> What you could do is switch to tumbling windows, then each element would >> only be in one window. That probably won’t fit your use case anymore. The >> alternative I see to that is to implement everything in a custom operator >> where you deal with window states and triggering on time yourself. Let me >> know if you need some pointers about that one. >> >> Cheers, >> Aljoscha >> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> >> wrote: >> > >> > Hello, >> > >> > I'm trying to implement a left outer join of two Kafka streams within a >> sliding window. So far I have the following code: >> > >> > foos >> > .coGroup(bars) >> > .where(_.baz).equalTo(_.baz) >> > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), >> Time.of(1, TimeUnit.SECONDS))) >> > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => >> > fs.foreach(f => >> > if (bs.isEmpty) >> > o.collect(FooBar(f, None)) >> > else >> > bs.foreach(b => o.collect(FooBar(f, Some(b)))) >> > ) >> > ) >> > >> > However, this results in the pair being emitted from every window >> slide, regardless of the match. The desired behaviour would be: >> > * emit the the match as soon as it's found, don't emit any more pairs >> for it, >> > * otherwise, emit the empty match, when the left side element leaves >> the last of its windows >> > >> > What would be the idiomatic/efficient way to implement such behaviour? >> Is it possible at all with the coGroup/window mechanism, or some other way >> is necessary? >> > >> > Alex >> >> >