Hi, I think hacking the StreamJoinOperator could work for you. In Flink a StreamOperator is essentially a more low-level version of a FlatMap. It receives elements by the processElement() method and can emit elements using the Output object output which is a souped up Collector that also allows for watermark emission. Also, the StreamOperator has a StreamingRuntimeContext that can be used to register timer callbacks to perform work on specific time intervals or after timeouts. One think to note is that StreamOperators always deal with StreamRecord<T> which is a wrapper for the user type T that also has a timestamp.
Cheers, Aljoscha > On 28 Jan 2016, at 14:31, Alexander Gryzlov <alex.gryz...@gmail.com> wrote: > > Hello Stephan, > > Yes, I've seen this one before, but AFAIU this is a different use-case: they > need an inner join with 2 different windows, whereas I'm ok with a single > window, but need an outer join with different semantics... Their > StreamJoinOperator, however looks roughly fitting, so I'll probably start by > hacking it; unless Aljoscha or somebody else with more experience than me has > a better idea :) > > Alex > > On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > I think this pull request may be implementing what you are looking for: > https://github.com/apache/flink/pull/1527 > > Stephan > > > On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryz...@gmail.com> > wrote: > Hello Aljoscha, > > Indeed, it seems like I'd need a custom operator. I imagine this involves > implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? > Could you provide those pointers please? > > Alex > > On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > Hi, > I’m afraid there is currently now way to do what you want with the builtin > window primitives. Each of the slices of the sliding windows is essentially > evaluated independently. Therefore, there cannot be effects in one slice that > influence processing of another slice. > > What you could do is switch to tumbling windows, then each element would only > be in one window. That probably won’t fit your use case anymore. The > alternative I see to that is to implement everything in a custom operator > where you deal with window states and triggering on time yourself. Let me > know if you need some pointers about that one. > > Cheers, > Aljoscha > > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> wrote: > > > > Hello, > > > > I'm trying to implement a left outer join of two Kafka streams within a > > sliding window. So far I have the following code: > > > > foos > > .coGroup(bars) > > .where(_.baz).equalTo(_.baz) > > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, > > TimeUnit.SECONDS))) > > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => > > fs.foreach(f => > > if (bs.isEmpty) > > o.collect(FooBar(f, None)) > > else > > bs.foreach(b => o.collect(FooBar(f, Some(b)))) > > ) > > ) > > > > However, this results in the pair being emitted from every window slide, > > regardless of the match. The desired behaviour would be: > > * emit the the match as soon as it's found, don't emit any more pairs for > > it, > > * otherwise, emit the empty match, when the left side element leaves the > > last of its windows > > > > What would be the idiomatic/efficient way to implement such behaviour? Is > > it possible at all with the coGroup/window mechanism, or some other way is > > necessary? > > > > Alex > > > >