Hi Marco,

Absolutely no problem at all. I'm glad it work it out. For reference here
is a PR that fixes the problem:
https://github.com/apache/kafka/pull/2645 and the associated JIRA
https://issues.apache.org/jira/browse/KAFKA-4851

Thanks,
Damian

On Tue, 7 Mar 2017 at 17:48 Marco Abitabile <marco.abitab...@gmail.com>
wrote:

> Hello Damian,
>
> Thanks a lot for your precious support.
>
> I confirm you that your workaround is perfectly working for my use case.
>
> I'll be glad to support you to test the original code whenever the issue
> you've spotted will be solved.
>
> Thanks a lot again.
>
> Marco.
>
> Il 06/mar/2017 16:03, "Damian Guy" <damian....@gmail.com> ha scritto:
>
> > Hi Marco,
> >
> > I've done some testing and found that there is a performance issue when
> > caching is enabled. I suspect his might be what you are hitting. It looks
> > to me that you can work around this by doing something like:
> >
> > final StateStoreSupplier<SessionStore> sessionStore =
> > Stores.create(*"session-store-name"*)
> >     .withKeys(Serdes.String())
> >     .withValues(mySessionSerde)
> >     .persistent()
> >     .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> >     .build();
> >
> >
> > And then in your call to aggregate, pass in the sessionStore created
> above,
> > i.e.,
> >
> > aggregate(
> >     MySession::new,
> >     MySession::aggregateSessions,
> >     MySession::mergeSessions,
> >     SessionWindows
> >         .with(WINDOW_INACTIVITY_GAPS_MS),
> >     mySessionSerde,
> >     sessionStore)
> >
> >
> > Let us know how you get on.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile <marco.abitab...@gmail.com>
> > wrote:
> >
> > > Thanks Damian,
> > >
> > > sure, you are right, these details are modified to be compliant with my
> > > company rules. However the main points are unchanged.
> > >
> > > The producer of the original data is a "data ingestor" that attach few
> > > extra fields and produces a message such as:
> > >
> > > row = new JsonObject({
> > >   "id" : 12345654,
> > >   "userDeviceId" : "",
> > >   "creationTime" : 1488801350660 //produced from the remote source
> > >   "receivedTime": 1488801363455 //placed by my data ingestor,
> > >   "extra_data1" : 123, //
> > >   "extra_data2" : 456  // extra data specific for my domain all this
> data
> > > are numbers
> > >   "extra_data2" : 789  //
> > > })
> > >
> > > then it sends records into SOURCE_TOPIC (that in this context is
> > > USER_ACTIVITIES_TOPIC) as follow:
> > >
> > > long creationTimestamp = row.getLong("creationTime");
> > > long rowId = row.getLong("id");
> > > ProducerRecord<String, String> producerRecord = new
> > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> > > row.toString());
> > > producer.send(producerRecord);
> > >
> > > Noteworthy:
> > > - I'm using only one partition (right now. I'm still not in production
> > and
> > > i'm discovering the feature) in production environment I would use more
> > > partitions
> > > - the message is a simple string containing json object (i'm not using
> > Avro
> > > or similar)
> > >
> > > - in my streaming application:
> > >
> > > public class MySession{
> > >
> > >     private final JsonObject sessionDetails;
> > >
> > >     public MySession(){
> > >         this.sessionDetails = new JsonObject();
> > >     }
> > >
> > >     public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String
> > k,
> > > JsonObject j) {
> > >         int userId = cache.get(j.get("userDeviceId"));
> > >         return KeyValue.pair(userId, j);
> > >     }
> > >
> > >     public static MySession aggregate(String key, JsonObject value,
> > > MySession aggregate) {
> > >         //basically MySession is a collection of all the raw data that
> > the
> > > session is composed of
> > >         aggregate.addRawData(value);
> > >         return aggregate;
> > >     }
> > >
> > >     public static MySession merge(String key, MySession arg1, MySession
> > > arg2)
> > > {
> > >         arg2.merge(arg1);
> > >         return arg2;
> > >     }
> > >
> > > }
> > >
> > >
> > > BTW (this will be a topic for another thread anyway...) is there a way
> to
> > > be con control of MySession lifecycle? I was thinking to pool them to
> > > reduce GC workload.
> > >
> > > thanks a lot for your precious help.
> > >
> > > Marco
> > >
> > > 2017-03-06 11:59 GMT+01:00 Damian Guy <damian....@gmail.com>:
> > >
> > > > Hi Marco,
> > > >
> > > > Your config etc look ok.
> > > >
> > > > 1. It is pretty hard to tell what is going on from just your code
> > below,
> > > > unfortunately. But the behaviour doesn't seem to be inline with what
> > I'm
> > > > reading in the streams code. For example your MySession::new function
> > > > should be called once per record. The merger and aggregator should be
> > > > called pretty much immediately after that.
> > > >
> > > > 2. Data will be retained for a bit longer than the value used in
> > > > SessionWindows.until(..). The session store has 3 segments and we use
> > the
> > > >  retention period (i.e., value of until()) to determine the segment
> > > length.
> > > > The segment length is calculated as:
> > > >
> > > >  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
> > > >
> > > > Which in this case is 210000 milliseconds. So maintaining 3 segments
> > > means
> > > > there could be data that is about 10 minutes old.
> > > >
> > > > Also this is completely driven by the data and specifically the time
> > > > extracted from the data. I'm not sure if you can provide a sample of
> > the
> > > > data going through the system? It might be helpful in trying to debug
> > the
> > > > issue. (I'm not seeing anything obvious in the code).
> > > > Also it might help if you can get some stack traces on the streams
> > > > instances that appear to be stuck.
> > > >
> > > > Thanks,
> > > > Damian
> > > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <
> marco.abitab...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I'm playing around with the brand new SessionWindows. I have a
> simple
> > > > > topology such as:
> > > > >
> > > > > KStream<String, JsonObject> sess =
> > > > >  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> > > > > sess
> > > > >     .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> > > > >     .groupByKey(stringSerde, jsonSerde)
> > > > >     .aggregate(
> > > > >         MySession::new,
> > > > >         MySession::aggregateSessions,
> > > > >         MySession::mergeSessions,
> > > > >         SessionWindows
> > > > >             .with(WINDOW_INACTIVITY_GAPS_MS)
> > > > >             .until(WINDOW_MAINTAIN_DURATION_MS),
> > > > >     .filter(MySession::filterOutZeroLenghtSessions)
> > > > >     .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
> > > > >
> > > > > these are the most important configuration I'm using, all the other
> > > > configs
> > > > > are the classical serdes and hosts props:
> > > > >
> > > > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> > > > > private static final String WINDOW_MAINTAIN_DURATION_MS =
> 5_MINUTES +
> > > > > 2_MINUTES;
> > > > >
> > > > > private static final Properties props = new Properties();
> > > > >
> > > > >
> > >
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_
> > > > CONFIG,
> > > > > ONE_DAY);
> > > > >
> > > > > The source stream has data arriving at around 100 messages/second
> > > > >
> > > > > I'm experiencing this behaviours:
> > > > >
> > > > > 1) MySession::new is called thousands of times, way way more of the
> > > > number
> > > > > of messages ingested (around 100 / 1000 times more) the most of
> this
> > > > > sessions never reach the end of the pipeline (even if I remove
> > > > > .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> > > > > MySession::aggregateSessions
> > > > > and MySession::mergeSessions are invoked.
> > > > >
> > > > > Is this correct? I don't understand, maybe I've setup something
> > > wrong...
> > > > >
> > > > > 2) I can see that the stream pipeline can ingest the first 15
> minutes
> > > of
> > > > > data and sessions that reach SINK_TOPIC_KTABLE  looks good.
> However:
> > > > >    - every second that passes the pipeline gets slower and slower
> and
> > > > >    - I can see new updates to old sessions also after
> > > > > .until(WINDOW_MAINTAIN_DURATION_MS)
> > > > > period.
> > > > >    - the stream consumer starts to ingest new data with slower and
> > > slower
> > > > > rates as time passes, eventually reaching almost 0msg/sec
> > > > >
> > > > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see
> only
> > > new
> > > > > sessions and those that have been fired, will just be removed from
> > > > session
> > > > > store and never touched again.
> > > > >
> > > > >
> > > > > At the beginning I was thinking that my pipeline was not setup
> > > correctly,
> > > > > however I've tried to follow slavishly the docs and I could not
> find
> > > > where
> > > > > things can go wrong.
> > > > >
> > > > > Do you have some hints about this?
> > > > > Please let me know if you need more info about.
> > > > >
> > > > > thanks a lot,
> > > > > Marco
> > > > >
> > > >
> > >
> >
>

Reply via email to