Kafka Streams uses an incremental computation model, ie, each time a new message arrives, the current session is updated immediately (note that the result is a `KTable`). -- Kafka Streams does not accumulate the raw events and computes the session result when the window closes, but it refines the session result each time a new record comes in (or create new session if necessary).
As you call `toStream()` you get a new record each time the row (that represent the session) in the table is updated. Because new records change the session boundaries (and the session window is used a key in the table), the updates are implemented as a delete of the old session and an insert into the result table as a new session. That is why you see <key,null> messages. If you only want to get the final result, you can use the `suppress()` operator: .count().suppress().toStream() `suppress()` creates a new result table that only contains sessions that won't update any longer, because the session grace period passed. Btw: the default grace period is 24h, and you might want to change the default via `SessionWindows#grace()` to a lower value -- otherwise, you would see the result only after 24h hours. For more details, check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/ -Matthias On 12/1/20 4:46 PM, Eric Beabes wrote: > I've following code in my Kafka Streams application: > > *.groupBy((_, myobj) => myobj.getId)(Grouped.`with`[String, Myobj])* > > *.windowedBy(SessionWindows.`with`(Duration.ofMillis(10 * 60 * 1000)))* > > *.count()* > > *.toStream* > > *.map((k,v) => (k.key(), v))* > > *.to("kafka-streams-test")* > > > Expectation: If messages don't come in for an ID for a period of 10 minutes > session should time out & a new message with 'Id, Count of messages' should > get printed. > > What I see is this: > 1) Session gets timed out immediately. (I tried resetting > StreamsConfig.APPLICATION_ID_CONFIG with the hope that every time a new > session will begin but that's not happening!) > 2) On the output topic "kafka-streams-test", I see NULL messages. > 3) Instead of writing to the output topic I tried writing to console as > follows: > > *source.foreach((k, v) => println("Id: " + k + " Count: " + v))* > > This prints messages such as these: > > Id: 200 Count: 1 > Id: 201 Count: 1 > > What am I doing wrong? Please advise. Thanks. >