Hello Damian, Thanks a lot for your precious support.
I confirm you that your workaround is perfectly working for my use case. I'll be glad to support you to test the original code whenever the issue you've spotted will be solved. Thanks a lot again. Marco. Il 06/mar/2017 16:03, "Damian Guy" <damian....@gmail.com> ha scritto: > Hi Marco, > > I've done some testing and found that there is a performance issue when > caching is enabled. I suspect his might be what you are hitting. It looks > to me that you can work around this by doing something like: > > final StateStoreSupplier<SessionStore> sessionStore = > Stores.create(*"session-store-name"*) > .withKeys(Serdes.String()) > .withValues(mySessionSerde) > .persistent() > .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > .build(); > > > And then in your call to aggregate, pass in the sessionStore created above, > i.e., > > aggregate( > MySession::new, > MySession::aggregateSessions, > MySession::mergeSessions, > SessionWindows > .with(WINDOW_INACTIVITY_GAPS_MS), > mySessionSerde, > sessionStore) > > > Let us know how you get on. > > Thanks, > Damian > > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile <marco.abitab...@gmail.com> > wrote: > > > Thanks Damian, > > > > sure, you are right, these details are modified to be compliant with my > > company rules. However the main points are unchanged. > > > > The producer of the original data is a "data ingestor" that attach few > > extra fields and produces a message such as: > > > > row = new JsonObject({ > > "id" : 12345654, > > "userDeviceId" : "", > > "creationTime" : 1488801350660 //produced from the remote source > > "receivedTime": 1488801363455 //placed by my data ingestor, > > "extra_data1" : 123, // > > "extra_data2" : 456 // extra data specific for my domain all this data > > are numbers > > "extra_data2" : 789 // > > }) > > > > then it sends records into SOURCE_TOPIC (that in this context is > > USER_ACTIVITIES_TOPIC) as follow: > > > > long creationTimestamp = row.getLong("creationTime"); > > long rowId = row.getLong("id"); > > ProducerRecord<String, String> producerRecord = new > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > > row.toString()); > > producer.send(producerRecord); > > > > Noteworthy: > > - I'm using only one partition (right now. I'm still not in production > and > > i'm discovering the feature) in production environment I would use more > > partitions > > - the message is a simple string containing json object (i'm not using > Avro > > or similar) > > > > - in my streaming application: > > > > public class MySession{ > > > > private final JsonObject sessionDetails; > > > > public MySession(){ > > this.sessionDetails = new JsonObject(); > > } > > > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String > k, > > JsonObject j) { > > int userId = cache.get(j.get("userDeviceId")); > > return KeyValue.pair(userId, j); > > } > > > > public static MySession aggregate(String key, JsonObject value, > > MySession aggregate) { > > //basically MySession is a collection of all the raw data that > the > > session is composed of > > aggregate.addRawData(value); > > return aggregate; > > } > > > > public static MySession merge(String key, MySession arg1, MySession > > arg2) > > { > > arg2.merge(arg1); > > return arg2; > > } > > > > } > > > > > > BTW (this will be a topic for another thread anyway...) is there a way to > > be con control of MySession lifecycle? I was thinking to pool them to > > reduce GC workload. > > > > thanks a lot for your precious help. > > > > Marco > > > > 2017-03-06 11:59 GMT+01:00 Damian Guy <damian....@gmail.com>: > > > > > Hi Marco, > > > > > > Your config etc look ok. > > > > > > 1. It is pretty hard to tell what is going on from just your code > below, > > > unfortunately. But the behaviour doesn't seem to be inline with what > I'm > > > reading in the streams code. For example your MySession::new function > > > should be called once per record. The merger and aggregator should be > > > called pretty much immediately after that. > > > > > > 2. Data will be retained for a bit longer than the value used in > > > SessionWindows.until(..). The session store has 3 segments and we use > the > > > retention period (i.e., value of until()) to determine the segment > > length. > > > The segment length is calculated as: > > > > > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > > > > > Which in this case is 210000 milliseconds. So maintaining 3 segments > > means > > > there could be data that is about 10 minutes old. > > > > > > Also this is completely driven by the data and specifically the time > > > extracted from the data. I'm not sure if you can provide a sample of > the > > > data going through the system? It might be helpful in trying to debug > the > > > issue. (I'm not seeing anything obvious in the code). > > > Also it might help if you can get some stack traces on the streams > > > instances that appear to be stuck. > > > > > > Thanks, > > > Damian > > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com > > > > > wrote: > > > > > > > Hello, > > > > > > > > I'm playing around with the brand new SessionWindows. I have a simple > > > > topology such as: > > > > > > > > KStream<String, JsonObject> sess = > > > > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > > > > sess > > > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > > > > .groupByKey(stringSerde, jsonSerde) > > > > .aggregate( > > > > MySession::new, > > > > MySession::aggregateSessions, > > > > MySession::mergeSessions, > > > > SessionWindows > > > > .with(WINDOW_INACTIVITY_GAPS_MS) > > > > .until(WINDOW_MAINTAIN_DURATION_MS), > > > > .filter(MySession::filterOutZeroLenghtSessions) > > > > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > > > > > > > these are the most important configuration I'm using, all the other > > > configs > > > > are the classical serdes and hosts props: > > > > > > > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > > > > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > > > > 2_MINUTES; > > > > > > > > private static final Properties props = new Properties(); > > > > > > > > > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_ > > > CONFIG, > > > > ONE_DAY); > > > > > > > > The source stream has data arriving at around 100 messages/second > > > > > > > > I'm experiencing this behaviours: > > > > > > > > 1) MySession::new is called thousands of times, way way more of the > > > number > > > > of messages ingested (around 100 / 1000 times more) the most of this > > > > sessions never reach the end of the pipeline (even if I remove > > > > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > > > > MySession::aggregateSessions > > > > and MySession::mergeSessions are invoked. > > > > > > > > Is this correct? I don't understand, maybe I've setup something > > wrong... > > > > > > > > 2) I can see that the stream pipeline can ingest the first 15 minutes > > of > > > > data and sessions that reach SINK_TOPIC_KTABLE looks good. However: > > > > - every second that passes the pipeline gets slower and slower and > > > > - I can see new updates to old sessions also after > > > > .until(WINDOW_MAINTAIN_DURATION_MS) > > > > period. > > > > - the stream consumer starts to ingest new data with slower and > > slower > > > > rates as time passes, eventually reaching almost 0msg/sec > > > > > > > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only > > new > > > > sessions and those that have been fired, will just be removed from > > > session > > > > store and never touched again. > > > > > > > > > > > > At the beginning I was thinking that my pipeline was not setup > > correctly, > > > > however I've tried to follow slavishly the docs and I could not find > > > where > > > > things can go wrong. > > > > > > > > Do you have some hints about this? > > > > Please let me know if you need more info about. > > > > > > > > thanks a lot, > > > > Marco > > > > > > > > > >