Hello, I'm playing around with the brand new SessionWindows. I have a simple topology such as:
KStream<String, JsonObject> sess = builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); sess .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) .groupByKey(stringSerde, jsonSerde) .aggregate( MySession::new, MySession::aggregateSessions, MySession::mergeSessions, SessionWindows .with(WINDOW_INACTIVITY_GAPS_MS) .until(WINDOW_MAINTAIN_DURATION_MS), .filter(MySession::filterOutZeroLenghtSessions) .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); these are the most important configuration I'm using, all the other configs are the classical serdes and hosts props: private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + 2_MINUTES; private static final Properties props = new Properties(); props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, ONE_DAY); The source stream has data arriving at around 100 messages/second I'm experiencing this behaviours: 1) MySession::new is called thousands of times, way way more of the number of messages ingested (around 100 / 1000 times more) the most of this sessions never reach the end of the pipeline (even if I remove .filter(MySession::filterOutZeroLenghtSessions) ) and nor MySession::aggregateSessions and MySession::mergeSessions are invoked. Is this correct? I don't understand, maybe I've setup something wrong... 2) I can see that the stream pipeline can ingest the first 15 minutes of data and sessions that reach SINK_TOPIC_KTABLE looks good. However: - every second that passes the pipeline gets slower and slower and - I can see new updates to old sessions also after .until(WINDOW_MAINTAIN_DURATION_MS) period. - the stream consumer starts to ingest new data with slower and slower rates as time passes, eventually reaching almost 0msg/sec I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new sessions and those that have been fired, will just be removed from session store and never touched again. At the beginning I was thinking that my pipeline was not setup correctly, however I've tried to follow slavishly the docs and I could not find where things can go wrong. Do you have some hints about this? Please let me know if you need more info about. thanks a lot, Marco