Hi all,

I'm trying to do a KTable-KTable join to compute an average within a
tumbling window.
Here's the KStreams code (I've put a fully working example in a gist:
https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a)
KStreamBuilder builder = new KStreamBuilder();

KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");

KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 10000L),
Serdes.String());


KTable<Windowed<String>, Long> longSums =
longs.reduceByKey((v1, v2) -> v1 + v2,
 TimeWindows.of("longSums", 10000L),
 Serdes.String(),
 Serdes.Long());

KTable<Windowed<String>, Double> longAvgs =
longSums.join(longCounts,
 (sum, count) ->
  sum.doubleValue()/count.doubleValue());

longAvgs.toStream((wk, v) -> wk.key())
.to(Serdes.String(),
Serdes.Double(),
"long-avgs");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

When I run this, I get the following exception:

java.util.NoSuchElementException
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64)
at
org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136)
at
org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117)
at
org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166)
at
org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Looks like the join is throwing the exception. Any ideas?

Thanks,
Tim

Reply via email to