Hello Stephan, Yes, I've seen this one before, but AFAIU this is a different use-case: they need an inner join with 2 different windows, whereas I'm ok with a single window, but need an outer join with different semantics... Their StreamJoinOperator, however looks roughly fitting, so I'll probably start by hacking it; unless Aljoscha or somebody else with more experience than me has a better idea :)
Alex On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > I think this pull request may be implementing what you are looking for: > https://github.com/apache/flink/pull/1527 > > Stephan > > > On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryz...@gmail.com > > wrote: > >> Hello Aljoscha, >> >> Indeed, it seems like I'd need a custom operator. I imagine this >> involves >> implementing >> org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could >> you provide those pointers please? >> >> Alex >> >> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> I’m afraid there is currently now way to do what you want with the >>> builtin window primitives. Each of the slices of the sliding windows is >>> essentially evaluated independently. Therefore, there cannot be effects in >>> one slice that influence processing of another slice. >>> >>> What you could do is switch to tumbling windows, then each element would >>> only be in one window. That probably won’t fit your use case anymore. The >>> alternative I see to that is to implement everything in a custom operator >>> where you deal with window states and triggering on time yourself. Let me >>> know if you need some pointers about that one. >>> >>> Cheers, >>> Aljoscha >>> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> >>> wrote: >>> > >>> > Hello, >>> > >>> > I'm trying to implement a left outer join of two Kafka streams within >>> a sliding window. So far I have the following code: >>> > >>> > foos >>> > .coGroup(bars) >>> > .where(_.baz).equalTo(_.baz) >>> > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), >>> Time.of(1, TimeUnit.SECONDS))) >>> > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) >>> => >>> > fs.foreach(f => >>> > if (bs.isEmpty) >>> > o.collect(FooBar(f, None)) >>> > else >>> > bs.foreach(b => o.collect(FooBar(f, Some(b)))) >>> > ) >>> > ) >>> > >>> > However, this results in the pair being emitted from every window >>> slide, regardless of the match. The desired behaviour would be: >>> > * emit the the match as soon as it's found, don't emit any more pairs >>> for it, >>> > * otherwise, emit the empty match, when the left side element leaves >>> the last of its windows >>> > >>> > What would be the idiomatic/efficient way to implement such behaviour? >>> Is it possible at all with the coGroup/window mechanism, or some other way >>> is necessary? >>> > >>> > Alex >>> >>> >> >