Hi Marco,

Your config etc look ok.

1. It is pretty hard to tell what is going on from just your code below,
unfortunately. But the behaviour doesn't seem to be inline with what I'm
reading in the streams code. For example your MySession::new function
should be called once per record. The merger and aggregator should be
called pretty much immediately after that.

2. Data will be retained for a bit longer than the value used in
SessionWindows.until(..). The session store has 3 segments and we use the
 retention period (i.e., value of until()) to determine the segment length.
The segment length is calculated as:

 Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);

Which in this case is 210000 milliseconds. So maintaining 3 segments means
there could be data that is about 10 minutes old.

Also this is completely driven by the data and specifically the time
extracted from the data. I'm not sure if you can provide a sample of the
data going through the system? It might be helpful in trying to debug the
issue. (I'm not seeing anything obvious in the code).
Also it might help if you can get some stack traces on the streams
instances that appear to be stuck.

Thanks,
Damian
On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com>
wrote:

> Hello,
>
> I'm playing around with the brand new SessionWindows. I have a simple
> topology such as:
>
> KStream<String, JsonObject> sess =
>  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> sess
>     .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
>     .groupByKey(stringSerde, jsonSerde)
>     .aggregate(
>         MySession::new,
>         MySession::aggregateSessions,
>         MySession::mergeSessions,
>         SessionWindows
>             .with(WINDOW_INACTIVITY_GAPS_MS)
>             .until(WINDOW_MAINTAIN_DURATION_MS),
>     .filter(MySession::filterOutZeroLenghtSessions)
>     .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
> these are the most important configuration I'm using, all the other configs
> are the classical serdes and hosts props:
>
> private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> 2_MINUTES;
>
> private static final Properties props = new Properties();
>
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> ONE_DAY);
>
> The source stream has data arriving at around 100 messages/second
>
> I'm experiencing this behaviours:
>
> 1) MySession::new is called thousands of times, way way more of the number
> of messages ingested (around 100 / 1000 times more) the most of this
> sessions never reach the end of the pipeline (even if I remove
> .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> MySession::aggregateSessions
> and MySession::mergeSessions are invoked.
>
> Is this correct? I don't understand, maybe I've setup something wrong...
>
> 2) I can see that the stream pipeline can ingest the first 15 minutes of
> data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
>    - every second that passes the pipeline gets slower and slower and
>    - I can see new updates to old sessions also after
> .until(WINDOW_MAINTAIN_DURATION_MS)
> period.
>    - the stream consumer starts to ingest new data with slower and slower
> rates as time passes, eventually reaching almost 0msg/sec
>
> I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> sessions and those that have been fired, will just be removed from session
> store and never touched again.
>
>
> At the beginning I was thinking that my pipeline was not setup correctly,
> however I've tried to follow slavishly the docs and I could not find where
> things can go wrong.
>
> Do you have some hints about this?
> Please let me know if you need more info about.
>
> thanks a lot,
> Marco
>

Reply via email to