I've following code in my Kafka Streams application:

*.groupBy((_, myobj) => myobj.getId)(Grouped.`with`[String, Myobj])*

*.windowedBy(SessionWindows.`with`(Duration.ofMillis(10 * 60 * 1000)))*

*.count()*

*.toStream*

*.map((k,v) => (k.key(), v))*

*.to("kafka-streams-test")*


Expectation: If messages don't come in for an ID for a period of 10 minutes
session should time out & a new message with 'Id, Count of messages' should
get printed.

What I see is this:
1) Session gets timed out immediately. (I tried resetting
StreamsConfig.APPLICATION_ID_CONFIG with the hope that every time a new
session will begin but that's not happening!)
2) On the output topic "kafka-streams-test", I see NULL messages.
3) Instead of writing to the output topic I tried writing to console as
follows:

*source.foreach((k, v) => println("Id: " + k + " Count: " + v))*

This prints messages such as these:

Id: 200 Count: 1
Id: 201 Count: 1

What am I doing wrong? Please advise. Thanks.

Reply via email to