Hi Marco, FYI this has now been merged to trunk and 0.10.2. If you wanted to try it out i'd probably suggest checking out building the 0.10.2 branch.
Thanks, Damian On Tue, 7 Mar 2017 at 17:57 Damian Guy <damian....@gmail.com> wrote: > Hi Marco, > > Absolutely no problem at all. I'm glad it work it out. For reference here > is a PR that fixes the problem: > https://github.com/apache/kafka/pull/2645 and the associated JIRA > https://issues.apache.org/jira/browse/KAFKA-4851 > > Thanks, > Damian > > On Tue, 7 Mar 2017 at 17:48 Marco Abitabile <marco.abitab...@gmail.com> > wrote: > > Hello Damian, > > Thanks a lot for your precious support. > > I confirm you that your workaround is perfectly working for my use case. > > I'll be glad to support you to test the original code whenever the issue > you've spotted will be solved. > > Thanks a lot again. > > Marco. > > Il 06/mar/2017 16:03, "Damian Guy" <damian....@gmail.com> ha scritto: > > > Hi Marco, > > > > I've done some testing and found that there is a performance issue when > > caching is enabled. I suspect his might be what you are hitting. It looks > > to me that you can work around this by doing something like: > > > > final StateStoreSupplier<SessionStore> sessionStore = > > Stores.create(*"session-store-name"*) > > .withKeys(Serdes.String()) > > .withValues(mySessionSerde) > > .persistent() > > .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > > .build(); > > > > > > And then in your call to aggregate, pass in the sessionStore created > above, > > i.e., > > > > aggregate( > > MySession::new, > > MySession::aggregateSessions, > > MySession::mergeSessions, > > SessionWindows > > .with(WINDOW_INACTIVITY_GAPS_MS), > > mySessionSerde, > > sessionStore) > > > > > > Let us know how you get on. > > > > Thanks, > > Damian > > > > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile <marco.abitab...@gmail.com> > > wrote: > > > > > Thanks Damian, > > > > > > sure, you are right, these details are modified to be compliant with my > > > company rules. However the main points are unchanged. > > > > > > The producer of the original data is a "data ingestor" that attach few > > > extra fields and produces a message such as: > > > > > > row = new JsonObject({ > > > "id" : 12345654, > > > "userDeviceId" : "", > > > "creationTime" : 1488801350660 //produced from the remote source > > > "receivedTime": 1488801363455 //placed by my data ingestor, > > > "extra_data1" : 123, // > > > "extra_data2" : 456 // extra data specific for my domain all this > data > > > are numbers > > > "extra_data2" : 789 // > > > }) > > > > > > then it sends records into SOURCE_TOPIC (that in this context is > > > USER_ACTIVITIES_TOPIC) as follow: > > > > > > long creationTimestamp = row.getLong("creationTime"); > > > long rowId = row.getLong("id"); > > > ProducerRecord<String, String> producerRecord = new > > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > > > row.toString()); > > > producer.send(producerRecord); > > > > > > Noteworthy: > > > - I'm using only one partition (right now. I'm still not in production > > and > > > i'm discovering the feature) in production environment I would use more > > > partitions > > > - the message is a simple string containing json object (i'm not using > > Avro > > > or similar) > > > > > > - in my streaming application: > > > > > > public class MySession{ > > > > > > private final JsonObject sessionDetails; > > > > > > public MySession(){ > > > this.sessionDetails = new JsonObject(); > > > } > > > > > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String > > k, > > > JsonObject j) { > > > int userId = cache.get(j.get("userDeviceId")); > > > return KeyValue.pair(userId, j); > > > } > > > > > > public static MySession aggregate(String key, JsonObject value, > > > MySession aggregate) { > > > //basically MySession is a collection of all the raw data that > > the > > > session is composed of > > > aggregate.addRawData(value); > > > return aggregate; > > > } > > > > > > public static MySession merge(String key, MySession arg1, MySession > > > arg2) > > > { > > > arg2.merge(arg1); > > > return arg2; > > > } > > > > > > } > > > > > > > > > BTW (this will be a topic for another thread anyway...) is there a way > to > > > be con control of MySession lifecycle? I was thinking to pool them to > > > reduce GC workload. > > > > > > thanks a lot for your precious help. > > > > > > Marco > > > > > > 2017-03-06 11:59 GMT+01:00 Damian Guy <damian....@gmail.com>: > > > > > > > Hi Marco, > > > > > > > > Your config etc look ok. > > > > > > > > 1. It is pretty hard to tell what is going on from just your code > > below, > > > > unfortunately. But the behaviour doesn't seem to be inline with what > > I'm > > > > reading in the streams code. For example your MySession::new function > > > > should be called once per record. The merger and aggregator should be > > > > called pretty much immediately after that. > > > > > > > > 2. Data will be retained for a bit longer than the value used in > > > > SessionWindows.until(..). The session store has 3 segments and we use > > the > > > > retention period (i.e., value of until()) to determine the segment > > > length. > > > > The segment length is calculated as: > > > > > > > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > > > > > > > Which in this case is 210000 milliseconds. So maintaining 3 segments > > > means > > > > there could be data that is about 10 minutes old. > > > > > > > > Also this is completely driven by the data and specifically the time > > > > extracted from the data. I'm not sure if you can provide a sample of > > the > > > > data going through the system? It might be helpful in trying to debug > > the > > > > issue. (I'm not seeing anything obvious in the code). > > > > Also it might help if you can get some stack traces on the streams > > > > instances that appear to be stuck. > > > > > > > > Thanks, > > > > Damian > > > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile < > marco.abitab...@gmail.com > > > > > > > wrote: > > > > > > > > > Hello, > > > > > > > > > > I'm playing around with the brand new SessionWindows. I have a > simple > > > > > topology such as: > > > > > > > > > > KStream<String, JsonObject> sess = > > > > > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > > > > > sess > > > > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > > > > > .groupByKey(stringSerde, jsonSerde) > > > > > .aggregate( > > > > > MySession::new, > > > > > MySession::aggregateSessions, > > > > > MySession::mergeSessions, > > > > > SessionWindows > > > > > .with(WINDOW_INACTIVITY_GAPS_MS) > > > > > .until(WINDOW_MAINTAIN_DURATION_MS), > > > > > .filter(MySession::filterOutZeroLenghtSessions) > > > > > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > > > > > > > > > these are the most important configuration I'm using, all the other > > > > configs > > > > > are the classical serdes and hosts props: > > > > > > > > > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > > > > > private static final String WINDOW_MAINTAIN_DURATION_MS = > 5_MINUTES + > > > > > 2_MINUTES; > > > > > > > > > > private static final Properties props = new Properties(); > > > > > > > > > > > > > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_ > > > > CONFIG, > > > > > ONE_DAY); > > > > > > > > > > The source stream has data arriving at around 100 messages/second > > > > > > > > > > I'm experiencing this behaviours: > > > > > > > > > > 1) MySession::new is called thousands of times, way way more of the > > > > number > > > > > of messages ingested (around 100 / 1000 times more) the most of > this > > > > > sessions never reach the end of the pipeline (even if I remove > > > > > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > > > > > MySession::aggregateSessions > > > > > and MySession::mergeSessions are invoked. > > > > > > > > > > Is this correct? I don't understand, maybe I've setup something > > > wrong... > > > > > > > > > > 2) I can see that the stream pipeline can ingest the first 15 > minutes > > > of > > > > > data and sessions that reach SINK_TOPIC_KTABLE looks good. > However: > > > > > - every second that passes the pipeline gets slower and slower > and > > > > > - I can see new updates to old sessions also after > > > > > .until(WINDOW_MAINTAIN_DURATION_MS) > > > > > period. > > > > > - the stream consumer starts to ingest new data with slower and > > > slower > > > > > rates as time passes, eventually reaching almost 0msg/sec > > > > > > > > > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see > only > > > new > > > > > sessions and those that have been fired, will just be removed from > > > > session > > > > > store and never touched again. > > > > > > > > > > > > > > > At the beginning I was thinking that my pipeline was not setup > > > correctly, > > > > > however I've tried to follow slavishly the docs and I could not > find > > > > where > > > > > things can go wrong. > > > > > > > > > > Do you have some hints about this? > > > > > Please let me know if you need more info about. > > > > > > > > > > thanks a lot, > > > > > Marco > > > > > > > > > > > > > > > >