hello Eno,

thanks for your support. The two streams are both kstreams. The window
is of 1 minute-width until 5 minutes. This is the code:


//Other Stream: User Location, is a string with the name of the city
the//user is (like "San Francisco")

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> userLocationStream = locationStreamBuilder
    .stream(stringSerde, stringSerde,"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream
    
.map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location");

//This Stream: User Activity
KStream<String, JsonObject> activity = builder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
    .map(MyStreamUtils::enhanceWithScoreDetails)
    .join(
        locationKstream,
        MyStreamUtils::locationActivityJoiner,
        JoinWindows.of(1000).until(1000 * 60 * 5),
        stringSerde, jsonSerde, stringSerde)
    .to("usersWithLocation")

KafkaStreams stream = new KafkaStreams(builder, propsActivity);
stream.start();


And MyStreamUtils::locationActivityJoiner does:
public static JsonObject locationActivityJoiner(JsonObject activity, String
loc) {
    JsonObject join = activity.copy();
    join.put("city" , loc);
    return join;
}


hum... your question is letting me think... are you telling me that since
both are kstreams, they actually need to be re-streamed in sync?

Thanks a lot.

Marco


2017-04-16 21:45 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:

> Hi Marco,
>
> Could you share a bit of your code, or at a minimum provide some info on:
> - is userActivitiesStream and geoDataStream a KStream of KTable?
> - what is the length of "timewindow"?
>
> Thanks
> Eno
>
> > On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitab...@gmail.com>
> wrote:
> >
> > Hi All!
> >
> > I need a little hint to understand how join works, in regards of stream
> > synchronization.
> >
> > This mail is a bit long, I need to explain the issue I'm facing.
> >
> > *TL-TR: *
> > it seems that join synchonization between stream is not respected as
> > explained here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+
> Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
> >
> > *The need:*
> > I have historical data residing into some databases, more specifically:
> >  - time series of user activities
> >  - time series of user geo positions
> >
> > *What I do:*
> > since I have a new algorithm I want to try, the historical data has been
> > already pruned by kafka retention policy and I have it into a database.
> > This is what I'm doing:
> >  1- spin up kafka-connect sink that takes historical gps data (let's say,
> > one day of data), ordered by event time, and push them into
> > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast
> as
> > possible into kafka topic, respecting the original event time.
> >  2- spin up kafka-connect sink that takes historical user activities
> > (let's say, one day of data, the same day of gps data, of course),
> ordered
> > by event time, and push them into "HistoricalUserActivites" topic. This
> > tasks pushes historical user activities data as fast as possible into
> kafka
> > topic, respecting the original event time.
> >  3- spin up my new stream processor algorithm
> >
> > As per the nature of the data, I have the quantity of activity data much
> > higher than geo data, thus the task1 pushes all the needed geo data into
> > kafka topic within few minutes (around 10 minutes), while activities
> data,
> > since has a higher volume, is entirely pushed within 1 hour.
> > --> the two streams are pushed into kafka regardless of their
> > synchronization (however being aware of their nature, as explained above)
> >
> > *What I expect:*
> > Now, what I would expect is that when I perform the join between the two
> > stream:
> >
> >   userActivitiesStream.join(geoDataStrea, timewindow...)
> >
> > the join takes the incoming user activities data and joins with the geo
> > data respecting the given time window.
> > As per the nature of the data, there is always a match (within the given
> > timeWindow) between user activities data with geo data (in fact, when
> this
> > data arrives in real time, there are no issues at all)
> >
> > So. I expect that the join picks up from the topic the right geo data
> > (recall that geo data is pushed into the topic within 10 minutes) and
> joins
> > it with the user activities data (recall that user activities data is a
> > stream that takes around 1 hour)
> >
> > *What I get:*
> > What happens is that only the first few minutes of user data is actually
> > processed by the join, after that user data comes is and the joins
> doesn't
> > join any data anymore.
> >
> > It seems that the join doesn't respect the time semantics (configured to
> be
> > the default straregy: event data) unless the two streams are synchronized
> > (actually, this happens the first minutes, when I start the whole
> > reprocessing tasks).
> >
> >
> > Can you help me to provide the right clue? Do I have to push the tho
> > streams in a sychronized fashion (such as simulating real time data flow,
> > as they came the first time into the system)?
> >
> > Thanks for your support.
> >
> > Best
> > Marco
>
>

Reply via email to