I've following code in my Kafka Streams application:
*.groupBy((_, myobj) => myobj.getId)(Grouped.`with`[String, Myobj])*
*.windowedBy(SessionWindows.`with`(Duration.ofMillis(10 * 60 * 1000)))*
*.count()*
*.toStream*
*.map((k,v) => (k.key(), v))*
*.to("kafka-streams-test")*
Expectation: If messages don't come in for an ID for a period of 10 minutes
session should time out & a new message with 'Id, Count of messages' should
get printed.
What I see is this:
1) Session gets timed out immediately. (I tried resetting
StreamsConfig.APPLICATION_ID_CONFIG with the hope that every time a new
session will begin but that's not happening!)
2) On the output topic "kafka-streams-test", I see NULL messages.
3) Instead of writing to the output topic I tried writing to console as
follows:
*source.foreach((k, v) => println("Id: " + k + " Count: " + v))*
This prints messages such as these:
Id: 200 Count: 1
Id: 201 Count: 1
What am I doing wrong? Please advise. Thanks.