Kafka Streams uses an incremental computation model, ie, each time a new
message arrives, the current session is updated immediately (note that
the result is a `KTable`). -- Kafka Streams does not accumulate the raw
events and computes the session result when the window closes, but it
refines the session result each time a new record comes in (or create
new session if necessary).

As you call `toStream()` you get a new record each time the row (that
represent the session) in the table is updated. Because new records
change the session boundaries (and the session window is used a key in
the table), the updates are implemented as a delete of the old session
and an insert into the result table as a new session. That is why you
see <key,null> messages.

If you only want to get the final result, you can use the `suppress()`
operator:

.count().suppress().toStream()

`suppress()` creates a new result table that only contains sessions that
won't update any longer, because the session grace period passed. Btw:
the default grace period is 24h, and you might want to change the
default via `SessionWindows#grace()` to a lower value -- otherwise, you
would see the result only after 24h hours.

For more details, check out this blog post:
https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/


-Matthias

On 12/1/20 4:46 PM, Eric Beabes wrote:
> I've following code in my Kafka Streams application:
> 
> *.groupBy((_, myobj) => myobj.getId)(Grouped.`with`[String, Myobj])*
> 
> *.windowedBy(SessionWindows.`with`(Duration.ofMillis(10 * 60 * 1000)))*
> 
> *.count()*
> 
> *.toStream*
> 
> *.map((k,v) => (k.key(), v))*
> 
> *.to("kafka-streams-test")*
> 
> 
> Expectation: If messages don't come in for an ID for a period of 10 minutes
> session should time out & a new message with 'Id, Count of messages' should
> get printed.
> 
> What I see is this:
> 1) Session gets timed out immediately. (I tried resetting
> StreamsConfig.APPLICATION_ID_CONFIG with the hope that every time a new
> session will begin but that's not happening!)
> 2) On the output topic "kafka-streams-test", I see NULL messages.
> 3) Instead of writing to the output topic I tried writing to console as
> follows:
> 
> *source.foreach((k, v) => println("Id: " + k + " Count: " + v))*
> 
> This prints messages such as these:
> 
> Id: 200 Count: 1
> Id: 201 Count: 1
> 
> What am I doing wrong? Please advise. Thanks.
> 

Reply via email to