Hello, I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code:
foos .coGroup(bars) .where(_.baz).equalTo(_.baz) .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS))) .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => fs.foreach(f => if (bs.isEmpty) o.collect(FooBar(f, None)) else bs.foreach(b => o.collect(FooBar(f, Some(b)))) ) ) However, this results in the pair being emitted from every window slide, regardless of the match. The desired behaviour would be: * emit the the match as soon as it's found, don't emit any more pairs for it, * otherwise, emit the empty match, when the left side element leaves the last of its windows What would be the idiomatic/efficient way to implement such behaviour? Is it possible at all with the coGroup/window mechanism, or some other way is necessary? Alex