Hi,
I think hacking the StreamJoinOperator could work for you. In Flink a 
StreamOperator is essentially a more low-level version of a FlatMap. It 
receives elements by the processElement() method and can emit elements using 
the Output object output which is a souped up Collector that also allows for 
watermark emission. Also, the StreamOperator has a StreamingRuntimeContext that 
can be used to register timer callbacks to perform work on specific time 
intervals or after timeouts. One think to note is that StreamOperators always 
deal with StreamRecord<T> which is a wrapper for the user type T that also has 
a timestamp.

Cheers,
Aljoscha
> On 28 Jan 2016, at 14:31, Alexander Gryzlov <alex.gryz...@gmail.com> wrote:
> 
> Hello Stephan,
> 
> Yes, I've seen this one before, but AFAIU this is a different use-case: they 
> need an inner join with 2 different windows, whereas I'm ok with a single 
> window, but need an outer join with different semantics... Their 
> StreamJoinOperator, however looks roughly fitting, so I'll probably start by 
> hacking it; unless Aljoscha or somebody else with more experience than me has 
> a better idea :)
> 
> Alex
> 
> On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> 
> I think this pull request may be implementing what you are looking for: 
> https://github.com/apache/flink/pull/1527
> 
> Stephan
> 
> 
> On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryz...@gmail.com> 
> wrote:
> Hello Aljoscha,
> 
> Indeed, it seems like I'd need a custom operator. I imagine this involves 
> implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? 
> Could you provide those pointers please? 
> 
> Alex
> 
> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljos...@apache.org> 
> wrote:
> Hi,
> I’m afraid there is currently now way to do what you want with the builtin 
> window primitives. Each of the slices of the sliding windows is essentially 
> evaluated independently. Therefore, there cannot be effects in one slice that 
> influence processing of another slice.
> 
> What you could do is switch to tumbling windows, then each element would only 
> be in one window. That probably won’t fit your use case anymore. The 
> alternative I see to that is to implement everything in a custom operator 
> where you deal with window states and triggering on time yourself. Let me 
> know if you need some pointers about that one.
> 
> Cheers,
> Aljoscha
> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm trying to implement a left outer join of two Kafka streams within a 
> > sliding window. So far I have the following code:
> >
> > foos
> >   .coGroup(bars)
> >   .where(_.baz).equalTo(_.baz)
> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, 
> > TimeUnit.SECONDS)))
> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
> >    fs.foreach(f =>
> >     if (bs.isEmpty)
> >       o.collect(FooBar(f, None))
> >     else
> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
> >    )
> >   )
> >
> > However, this results in the pair being emitted from every window slide, 
> > regardless of the match. The desired behaviour would be:
> > * emit the the match as soon as it's found, don't emit any more pairs for 
> > it,
> > * otherwise, emit the empty match, when the left side element leaves the 
> > last of its windows
> >
> > What would be the idiomatic/efficient way to implement such behaviour? Is 
> > it possible at all with the coGroup/window mechanism, or some other way is 
> > necessary?
> >
> > Alex
> 
> 
> 
> 

Reply via email to