Re: Streaming left outer join

2016-01-28 Thread Aljoscha Krettek
Hi, I think hacking the StreamJoinOperator could work for you. In Flink a StreamOperator is essentially a more low-level version of a FlatMap. It receives elements by the processElement() method and can emit elements using the Output object output which is a souped up Collector that also allows

Re: Streaming left outer join

2016-01-28 Thread Alexander Gryzlov
Hello Stephan, Yes, I've seen this one before, but AFAIU this is a different use-case: they need an inner join with 2 different windows, whereas I'm ok with a single window, but need an outer join with different semantics... Their StreamJoinOperator, however looks roughly fitting, so I'll probably

Re: Streaming left outer join

2016-01-27 Thread Stephan Ewen
Hi! I think this pull request may be implementing what you are looking for: https://github.com/apache/flink/pull/1527 Stephan On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov wrote: > Hello Aljoscha, > > Indeed, it seems like I'd need a custom operator. I imagine this involves > implementin

Re: Streaming left outer join

2016-01-27 Thread Alexander Gryzlov
Hello Aljoscha, Indeed, it seems like I'd need a custom operator. I imagine this involves implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could you provide those pointers please? Alex On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek wrote: > Hi, > I’m afraid the

Re: Streaming left outer join

2016-01-27 Thread Aljoscha Krettek
Hi, I’m afraid there is currently now way to do what you want with the builtin window primitives. Each of the slices of the sliding windows is essentially evaluated independently. Therefore, there cannot be effects in one slice that influence processing of another slice. What you could do is sw

Streaming left outer join

2016-01-26 Thread Alexander Gryzlov
Hello, I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code: foos .coGroup(bars) .where(_.baz).equalTo(_.baz) .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS))) .apply((fs: Iterator