Thanks Damian, sure, you are right, these details are modified to be compliant with my company rules. However the main points are unchanged.
The producer of the original data is a "data ingestor" that attach few extra fields and produces a message such as: row = new JsonObject({ "id" : 12345654, "userDeviceId" : "", "creationTime" : 1488801350660 //produced from the remote source "receivedTime": 1488801363455 //placed by my data ingestor, "extra_data1" : 123, // "extra_data2" : 456 // extra data specific for my domain all this data are numbers "extra_data2" : 789 // }) then it sends records into SOURCE_TOPIC (that in this context is USER_ACTIVITIES_TOPIC) as follow: long creationTimestamp = row.getLong("creationTime"); long rowId = row.getLong("id"); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, row.toString()); producer.send(producerRecord); Noteworthy: - I'm using only one partition (right now. I'm still not in production and i'm discovering the feature) in production environment I would use more partitions - the message is a simple string containing json object (i'm not using Avro or similar) - in my streaming application: public class MySession{ private final JsonObject sessionDetails; public MySession(){ this.sessionDetails = new JsonObject(); } public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k, JsonObject j) { int userId = cache.get(j.get("userDeviceId")); return KeyValue.pair(userId, j); } public static MySession aggregate(String key, JsonObject value, MySession aggregate) { //basically MySession is a collection of all the raw data that the session is composed of aggregate.addRawData(value); return aggregate; } public static MySession merge(String key, MySession arg1, MySession arg2) { arg2.merge(arg1); return arg2; } } BTW (this will be a topic for another thread anyway...) is there a way to be con control of MySession lifecycle? I was thinking to pool them to reduce GC workload. thanks a lot for your precious help. Marco 2017-03-06 11:59 GMT+01:00 Damian Guy <damian....@gmail.com>: > Hi Marco, > > Your config etc look ok. > > 1. It is pretty hard to tell what is going on from just your code below, > unfortunately. But the behaviour doesn't seem to be inline with what I'm > reading in the streams code. For example your MySession::new function > should be called once per record. The merger and aggregator should be > called pretty much immediately after that. > > 2. Data will be retained for a bit longer than the value used in > SessionWindows.until(..). The session store has 3 segments and we use the > retention period (i.e., value of until()) to determine the segment length. > The segment length is calculated as: > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > Which in this case is 210000 milliseconds. So maintaining 3 segments means > there could be data that is about 10 minutes old. > > Also this is completely driven by the data and specifically the time > extracted from the data. I'm not sure if you can provide a sample of the > data going through the system? It might be helpful in trying to debug the > issue. (I'm not seeing anything obvious in the code). > Also it might help if you can get some stack traces on the streams > instances that appear to be stuck. > > Thanks, > Damian > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com> > wrote: > > > Hello, > > > > I'm playing around with the brand new SessionWindows. I have a simple > > topology such as: > > > > KStream<String, JsonObject> sess = > > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > > sess > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > > .groupByKey(stringSerde, jsonSerde) > > .aggregate( > > MySession::new, > > MySession::aggregateSessions, > > MySession::mergeSessions, > > SessionWindows > > .with(WINDOW_INACTIVITY_GAPS_MS) > > .until(WINDOW_MAINTAIN_DURATION_MS), > > .filter(MySession::filterOutZeroLenghtSessions) > > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > > > these are the most important configuration I'm using, all the other > configs > > are the classical serdes and hosts props: > > > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > > 2_MINUTES; > > > > private static final Properties props = new Properties(); > > > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_ > CONFIG, > > ONE_DAY); > > > > The source stream has data arriving at around 100 messages/second > > > > I'm experiencing this behaviours: > > > > 1) MySession::new is called thousands of times, way way more of the > number > > of messages ingested (around 100 / 1000 times more) the most of this > > sessions never reach the end of the pipeline (even if I remove > > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > > MySession::aggregateSessions > > and MySession::mergeSessions are invoked. > > > > Is this correct? I don't understand, maybe I've setup something wrong... > > > > 2) I can see that the stream pipeline can ingest the first 15 minutes of > > data and sessions that reach SINK_TOPIC_KTABLE looks good. However: > > - every second that passes the pipeline gets slower and slower and > > - I can see new updates to old sessions also after > > .until(WINDOW_MAINTAIN_DURATION_MS) > > period. > > - the stream consumer starts to ingest new data with slower and slower > > rates as time passes, eventually reaching almost 0msg/sec > > > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new > > sessions and those that have been fired, will just be removed from > session > > store and never touched again. > > > > > > At the beginning I was thinking that my pipeline was not setup correctly, > > however I've tried to follow slavishly the docs and I could not find > where > > things can go wrong. > > > > Do you have some hints about this? > > Please let me know if you need more info about. > > > > thanks a lot, > > Marco > > >