Hi Marco, I've done some testing and found that there is a performance issue when caching is enabled. I suspect his might be what you are hitting. It looks to me that you can work around this by doing something like:
final StateStoreSupplier<SessionStore> sessionStore = Stores.create(*"session-store-name"*) .withKeys(Serdes.String()) .withValues(mySessionSerde) .persistent() .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) .build(); And then in your call to aggregate, pass in the sessionStore created above, i.e., aggregate( MySession::new, MySession::aggregateSessions, MySession::mergeSessions, SessionWindows .with(WINDOW_INACTIVITY_GAPS_MS), mySessionSerde, sessionStore) Let us know how you get on. Thanks, Damian On Mon, 6 Mar 2017 at 13:27 Marco Abitabile <marco.abitab...@gmail.com> wrote: > Thanks Damian, > > sure, you are right, these details are modified to be compliant with my > company rules. However the main points are unchanged. > > The producer of the original data is a "data ingestor" that attach few > extra fields and produces a message such as: > > row = new JsonObject({ > "id" : 12345654, > "userDeviceId" : "", > "creationTime" : 1488801350660 //produced from the remote source > "receivedTime": 1488801363455 //placed by my data ingestor, > "extra_data1" : 123, // > "extra_data2" : 456 // extra data specific for my domain all this data > are numbers > "extra_data2" : 789 // > }) > > then it sends records into SOURCE_TOPIC (that in this context is > USER_ACTIVITIES_TOPIC) as follow: > > long creationTimestamp = row.getLong("creationTime"); > long rowId = row.getLong("id"); > ProducerRecord<String, String> producerRecord = new > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > row.toString()); > producer.send(producerRecord); > > Noteworthy: > - I'm using only one partition (right now. I'm still not in production and > i'm discovering the feature) in production environment I would use more > partitions > - the message is a simple string containing json object (i'm not using Avro > or similar) > > - in my streaming application: > > public class MySession{ > > private final JsonObject sessionDetails; > > public MySession(){ > this.sessionDetails = new JsonObject(); > } > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k, > JsonObject j) { > int userId = cache.get(j.get("userDeviceId")); > return KeyValue.pair(userId, j); > } > > public static MySession aggregate(String key, JsonObject value, > MySession aggregate) { > //basically MySession is a collection of all the raw data that the > session is composed of > aggregate.addRawData(value); > return aggregate; > } > > public static MySession merge(String key, MySession arg1, MySession > arg2) > { > arg2.merge(arg1); > return arg2; > } > > } > > > BTW (this will be a topic for another thread anyway...) is there a way to > be con control of MySession lifecycle? I was thinking to pool them to > reduce GC workload. > > thanks a lot for your precious help. > > Marco > > 2017-03-06 11:59 GMT+01:00 Damian Guy <damian....@gmail.com>: > > > Hi Marco, > > > > Your config etc look ok. > > > > 1. It is pretty hard to tell what is going on from just your code below, > > unfortunately. But the behaviour doesn't seem to be inline with what I'm > > reading in the streams code. For example your MySession::new function > > should be called once per record. The merger and aggregator should be > > called pretty much immediately after that. > > > > 2. Data will be retained for a bit longer than the value used in > > SessionWindows.until(..). The session store has 3 segments and we use the > > retention period (i.e., value of until()) to determine the segment > length. > > The segment length is calculated as: > > > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > > > Which in this case is 210000 milliseconds. So maintaining 3 segments > means > > there could be data that is about 10 minutes old. > > > > Also this is completely driven by the data and specifically the time > > extracted from the data. I'm not sure if you can provide a sample of the > > data going through the system? It might be helpful in trying to debug the > > issue. (I'm not seeing anything obvious in the code). > > Also it might help if you can get some stack traces on the streams > > instances that appear to be stuck. > > > > Thanks, > > Damian > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com> > > wrote: > > > > > Hello, > > > > > > I'm playing around with the brand new SessionWindows. I have a simple > > > topology such as: > > > > > > KStream<String, JsonObject> sess = > > > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > > > sess > > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > > > .groupByKey(stringSerde, jsonSerde) > > > .aggregate( > > > MySession::new, > > > MySession::aggregateSessions, > > > MySession::mergeSessions, > > > SessionWindows > > > .with(WINDOW_INACTIVITY_GAPS_MS) > > > .until(WINDOW_MAINTAIN_DURATION_MS), > > > .filter(MySession::filterOutZeroLenghtSessions) > > > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > > > > > these are the most important configuration I'm using, all the other > > configs > > > are the classical serdes and hosts props: > > > > > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > > > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > > > 2_MINUTES; > > > > > > private static final Properties props = new Properties(); > > > > > > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_ > > CONFIG, > > > ONE_DAY); > > > > > > The source stream has data arriving at around 100 messages/second > > > > > > I'm experiencing this behaviours: > > > > > > 1) MySession::new is called thousands of times, way way more of the > > number > > > of messages ingested (around 100 / 1000 times more) the most of this > > > sessions never reach the end of the pipeline (even if I remove > > > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > > > MySession::aggregateSessions > > > and MySession::mergeSessions are invoked. > > > > > > Is this correct? I don't understand, maybe I've setup something > wrong... > > > > > > 2) I can see that the stream pipeline can ingest the first 15 minutes > of > > > data and sessions that reach SINK_TOPIC_KTABLE looks good. However: > > > - every second that passes the pipeline gets slower and slower and > > > - I can see new updates to old sessions also after > > > .until(WINDOW_MAINTAIN_DURATION_MS) > > > period. > > > - the stream consumer starts to ingest new data with slower and > slower > > > rates as time passes, eventually reaching almost 0msg/sec > > > > > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only > new > > > sessions and those that have been fired, will just be removed from > > session > > > store and never touched again. > > > > > > > > > At the beginning I was thinking that my pipeline was not setup > correctly, > > > however I've tried to follow slavishly the docs and I could not find > > where > > > things can go wrong. > > > > > > Do you have some hints about this? > > > Please let me know if you need more info about. > > > > > > thanks a lot, > > > Marco > > > > > >