Hi all, I'm trying to do a KTable-KTable join to compute an average within a tumbling window. Here's the KStreams code (I've put a fully working example in a gist: https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a) KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> longs = builder.stream( Serdes.String(), Serdes.Long(), "longs"); KTable<Windowed<String>, Long> longCounts = longs.countByKey(TimeWindows.of("longCounts", 10000L), Serdes.String()); KTable<Windowed<String>, Long> longSums = longs.reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("longSums", 10000L), Serdes.String(), Serdes.Long()); KTable<Windowed<String>, Double> longAvgs = longSums.join(longCounts, (sum, count) -> sum.doubleValue()/count.doubleValue()); longAvgs.toStream((wk, v) -> wk.key()) .to(Serdes.String(), Serdes.Double(), "long-avgs"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); When I run this, I get the following exception: java.util.NoSuchElementException at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64) at org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136) at org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117) at org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166) at org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147) at org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77) at org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) Looks like the join is throwing the exception. Any ideas? Thanks, Tim