Hello Eno,

yes it is 1-second width (I unintentionally wrote 1-minute, in my previous
mail).
Just to provide you more info about the data: location data have 1-second
resolution, while user activity data arrives with varying speed, may happen
to have 10-100 records as well as 0 records within 1 second.

In the meantime I tried to start my reprocessing stage by syncing the two
streams (location data and User data).
Apparently it works and joins are performed continuously.
What I'm experiencing now is that after the join I have a session window
aggregation that is creating new sessions at very high rate. Right now the
stream app has ingested 6k activity data records, however, more than 500k
session windows have been created. Among this 500k sessions, around 100
(one-hundred) have been aggregated with MySession::aggregateSessions
function.
also the cpu is 100% used.
To the same stream app, at the beginning of March, Damian Guy found an
issue (related with caching and session store) that he managed to fix right
after. Right now i'm using the trunk version of kafka 0.10.2.0.

the complete code is as follow:


//properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "UserSessionWithLocation");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
MySessionSerde.class.getName());
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
86400000);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
100);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
30000);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");

//Other Stream: User Location, is a string with the name of the city
the//user is (like "San Francisco")

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> userLocationStream = locationStreamBuilder
    .stream(stringSerde, stringSerde,"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream
    .map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");

//This Stream: User Activity
KStream<String, JsonObject> activity = builder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
    .map(MyStreamUtils::enhanceWithScoreDetails)
    .join(
        locationKstream,
        MyStreamUtils::locationActivityJoiner,
        JoinWindows.of(1000).until(1000 * 60 * 5),
        stringSerde, jsonSerde, stringSerde)
    .through("usersWithLocation")

    .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
    .groupByKey(stringSerde, jsonSerde)
    .aggregate(
        MySession::new,
        MySession::aggregateSessions,
        MySession::mergeSessions,
        SessionWindows
            .with(WINDOW_INACTIVITY_GAPS_MS) //5 minutes
            .until(WINDOW_MAINTAIN_DURATION_MS), // 7 minutes
        "aggregate_store")
    .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);

KafkaStreams stream = new KafkaStreams(builder, propsActivity);
stream.start();


Do you see any issue here?Thanks a lot.
Marco


2017-04-18 13:14 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:

> Hi Marco,
>
> I noticed your window is 1 second width, not 1 minute width. Is that
> intentional?
>
> Thanks
> Eno
>
> On 17 Apr 2017, at 19:41, Marco Abitabile <marco.abitab...@gmail.com>
> wrote:
>
> hello Eno,
>
> thanks for your support. The two streams are both kstreams. The window is of 
> 1 minute-width until 5 minutes. This is the code:
>
>
> //Other Stream: User Location, is a string with the name of the city 
> the//user is (like "San Francisco")
>
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> userLocationStream = locationStreamBuilder
>     .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream
>     
> .map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location");
>
> //This Stream: User Activity
> KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, 
> "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
>     .map(MyStreamUtils::enhanceWithScoreDetails)
>     .join(
>         locationKstream,
>         MyStreamUtils::locationActivityJoiner,
>         JoinWindows.of(1000).until(1000 * 60 * 5),
>         stringSerde, jsonSerde, stringSerde)
>     .to("usersWithLocation")
>
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
>
>
> And MyStreamUtils::locationActivityJoiner does:
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
>     JsonObject join = activity.copy();
>     join.put("city" , loc);
>     return join;
> }
>
>
> hum... your question is letting me think... are you telling me that since
> both are kstreams, they actually need to be re-streamed in sync?
>
> Thanks a lot.
>
> Marco
>
>
> 2017-04-16 21:45 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:
>
>> Hi Marco,
>>
>> Could you share a bit of your code, or at a minimum provide some info on:
>> - is userActivitiesStream and geoDataStream a KStream of KTable?
>> - what is the length of "timewindow"?
>>
>> Thanks
>> Eno
>>
>> > On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitab...@gmail.com>
>> wrote:
>> >
>> > Hi All!
>> >
>> > I need a little hint to understand how join works, in regards of stream
>> > synchronization.
>> >
>> > This mail is a bit long, I need to explain the issue I'm facing.
>> >
>> > *TL-TR: *
>> > it seems that join synchonization between stream is not respected as
>> > explained here:
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+A
>> dd+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
>> >
>> > *The need:*
>> > I have historical data residing into some databases, more specifically:
>> >  - time series of user activities
>> >  - time series of user geo positions
>> >
>> > *What I do:*
>> > since I have a new algorithm I want to try, the historical data has been
>> > already pruned by kafka retention policy and I have it into a database.
>> > This is what I'm doing:
>> >  1- spin up kafka-connect sink that takes historical gps data (let's
>> say,
>> > one day of data), ordered by event time, and push them into
>> > "HistoricalGpsData" topic. This tasks pushes historical geo data as
>> fast as
>> > possible into kafka topic, respecting the original event time.
>> >  2- spin up kafka-connect sink that takes historical user activities
>> > (let's say, one day of data, the same day of gps data, of course),
>> ordered
>> > by event time, and push them into "HistoricalUserActivites" topic. This
>> > tasks pushes historical user activities data as fast as possible into
>> kafka
>> > topic, respecting the original event time.
>> >  3- spin up my new stream processor algorithm
>> >
>> > As per the nature of the data, I have the quantity of activity data much
>> > higher than geo data, thus the task1 pushes all the needed geo data into
>> > kafka topic within few minutes (around 10 minutes), while activities
>> data,
>> > since has a higher volume, is entirely pushed within 1 hour.
>> > --> the two streams are pushed into kafka regardless of their
>> > synchronization (however being aware of their nature, as explained
>> above)
>> >
>> > *What I expect:*
>> > Now, what I would expect is that when I perform the join between the two
>> > stream:
>> >
>> >   userActivitiesStream.join(geoDataStrea, timewindow...)
>> >
>> > the join takes the incoming user activities data and joins with the geo
>> > data respecting the given time window.
>> > As per the nature of the data, there is always a match (within the given
>> > timeWindow) between user activities data with geo data (in fact, when
>> this
>> > data arrives in real time, there are no issues at all)
>> >
>> > So. I expect that the join picks up from the topic the right geo data
>> > (recall that geo data is pushed into the topic within 10 minutes) and
>> joins
>> > it with the user activities data (recall that user activities data is a
>> > stream that takes around 1 hour)
>> >
>> > *What I get:*
>> > What happens is that only the first few minutes of user data is actually
>> > processed by the join, after that user data comes is and the joins
>> doesn't
>> > join any data anymore.
>> >
>> > It seems that the join doesn't respect the time semantics (configured
>> to be
>> > the default straregy: event data) unless the two streams are
>> synchronized
>> > (actually, this happens the first minutes, when I start the whole
>> > reprocessing tasks).
>> >
>> >
>> > Can you help me to provide the right clue? Do I have to push the tho
>> > streams in a sychronized fashion (such as simulating real time data
>> flow,
>> > as they came the first time into the system)?
>> >
>> > Thanks for your support.
>> >
>> > Best
>> > Marco
>>
>>
>
>

Reply via email to