Hello Stephan,

Yes, I've seen this one before, but AFAIU this is a different use-case:
they need an inner join with 2 different windows, whereas I'm ok with a
single window, but need an outer join with different semantics...
Their StreamJoinOperator, however looks roughly fitting, so I'll probably
start by hacking it; unless Aljoscha or somebody else with more experience
than me has a better idea :)

Alex

On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> I think this pull request may be implementing what you are looking for:
> https://github.com/apache/flink/pull/1527
>
> Stephan
>
>
> On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryz...@gmail.com
> > wrote:
>
>> Hello Aljoscha,
>>
>> Indeed, it seems like I'd need a custom operator. I imagine this
>> involves
>> implementing 
>> org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could
>> you provide those pointers please?
>>
>> Alex
>>
>> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I’m afraid there is currently now way to do what you want with the
>>> builtin window primitives. Each of the slices of the sliding windows is
>>> essentially evaluated independently. Therefore, there cannot be effects in
>>> one slice that influence processing of another slice.
>>>
>>> What you could do is switch to tumbling windows, then each element would
>>> only be in one window. That probably won’t fit your use case anymore. The
>>> alternative I see to that is to implement everything in a custom operator
>>> where you deal with window states and triggering on time yourself. Let me
>>> know if you need some pointers about that one.
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com>
>>> wrote:
>>> >
>>> > Hello,
>>> >
>>> > I'm trying to implement a left outer join of two Kafka streams within
>>> a sliding window. So far I have the following code:
>>> >
>>> > foos
>>> >   .coGroup(bars)
>>> >   .where(_.baz).equalTo(_.baz)
>>> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES),
>>> Time.of(1, TimeUnit.SECONDS)))
>>> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar])
>>> =>
>>> >    fs.foreach(f =>
>>> >     if (bs.isEmpty)
>>> >       o.collect(FooBar(f, None))
>>> >     else
>>> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>>> >    )
>>> >   )
>>> >
>>> > However, this results in the pair being emitted from every window
>>> slide, regardless of the match. The desired behaviour would be:
>>> > * emit the the match as soon as it's found, don't emit any more pairs
>>> for it,
>>> > * otherwise, emit the empty match, when the left side element leaves
>>> the last of its windows
>>> >
>>> > What would be the idiomatic/efficient way to implement such behaviour?
>>> Is it possible at all with the coGroup/window mechanism, or some other way
>>> is necessary?
>>> >
>>> > Alex
>>>
>>>
>>
>

Reply via email to