Hello,

I'm trying to implement a left outer join of two Kafka streams within a
sliding window. So far I have the following code:

foos
  .coGroup(bars)
  .where(_.baz).equalTo(_.baz)
  .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1,
TimeUnit.SECONDS)))
  .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
   fs.foreach(f =>
    if (bs.isEmpty)
      o.collect(FooBar(f, None))
    else
      bs.foreach(b => o.collect(FooBar(f, Some(b))))
   )
  )

However, this results in the pair being emitted from every window slide,
regardless of the match. The desired behaviour would be:
* emit the the match as soon as it's found, don't emit any more pairs for
it,
* otherwise, emit the empty match, when the left side element leaves the
last of its windows

What would be the idiomatic/efficient way to implement such behaviour? Is
it possible at all with the coGroup/window mechanism, or some other way is
necessary?

Alex

Reply via email to