hello Eno, thanks for your support. The two streams are both kstreams. The window is of 1 minute-width until 5 minutes. This is the code:
//Other Stream: User Location, is a string with the name of the city the//user is (like "San Francisco") KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> userLocationStream = locationStreamBuilder .stream(stringSerde, stringSerde,"userLocationStreamData"); KStream<String, String> locationKstream = userLocationStream .map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location"); //This Stream: User Activity KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, "activityStreamData"); activity.filter(MyStreamUtils::filterOutFakeUsers) .map(MyStreamUtils::enhanceWithScoreDetails) .join( locationKstream, MyStreamUtils::locationActivityJoiner, JoinWindows.of(1000).until(1000 * 60 * 5), stringSerde, jsonSerde, stringSerde) .to("usersWithLocation") KafkaStreams stream = new KafkaStreams(builder, propsActivity); stream.start(); And MyStreamUtils::locationActivityJoiner does: public static JsonObject locationActivityJoiner(JsonObject activity, String loc) { JsonObject join = activity.copy(); join.put("city" , loc); return join; } hum... your question is letting me think... are you telling me that since both are kstreams, they actually need to be re-streamed in sync? Thanks a lot. Marco 2017-04-16 21:45 GMT+02:00 Eno Thereska <eno.there...@gmail.com>: > Hi Marco, > > Could you share a bit of your code, or at a minimum provide some info on: > - is userActivitiesStream and geoDataStream a KStream of KTable? > - what is the length of "timewindow"? > > Thanks > Eno > > > On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitab...@gmail.com> > wrote: > > > > Hi All! > > > > I need a little hint to understand how join works, in regards of stream > > synchronization. > > > > This mail is a bit long, I need to explain the issue I'm facing. > > > > *TL-TR: * > > it seems that join synchonization between stream is not respected as > > explained here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+ > Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization > > > > *The need:* > > I have historical data residing into some databases, more specifically: > > - time series of user activities > > - time series of user geo positions > > > > *What I do:* > > since I have a new algorithm I want to try, the historical data has been > > already pruned by kafka retention policy and I have it into a database. > > This is what I'm doing: > > 1- spin up kafka-connect sink that takes historical gps data (let's say, > > one day of data), ordered by event time, and push them into > > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast > as > > possible into kafka topic, respecting the original event time. > > 2- spin up kafka-connect sink that takes historical user activities > > (let's say, one day of data, the same day of gps data, of course), > ordered > > by event time, and push them into "HistoricalUserActivites" topic. This > > tasks pushes historical user activities data as fast as possible into > kafka > > topic, respecting the original event time. > > 3- spin up my new stream processor algorithm > > > > As per the nature of the data, I have the quantity of activity data much > > higher than geo data, thus the task1 pushes all the needed geo data into > > kafka topic within few minutes (around 10 minutes), while activities > data, > > since has a higher volume, is entirely pushed within 1 hour. > > --> the two streams are pushed into kafka regardless of their > > synchronization (however being aware of their nature, as explained above) > > > > *What I expect:* > > Now, what I would expect is that when I perform the join between the two > > stream: > > > > userActivitiesStream.join(geoDataStrea, timewindow...) > > > > the join takes the incoming user activities data and joins with the geo > > data respecting the given time window. > > As per the nature of the data, there is always a match (within the given > > timeWindow) between user activities data with geo data (in fact, when > this > > data arrives in real time, there are no issues at all) > > > > So. I expect that the join picks up from the topic the right geo data > > (recall that geo data is pushed into the topic within 10 minutes) and > joins > > it with the user activities data (recall that user activities data is a > > stream that takes around 1 hour) > > > > *What I get:* > > What happens is that only the first few minutes of user data is actually > > processed by the join, after that user data comes is and the joins > doesn't > > join any data anymore. > > > > It seems that the join doesn't respect the time semantics (configured to > be > > the default straregy: event data) unless the two streams are synchronized > > (actually, this happens the first minutes, when I start the whole > > reprocessing tasks). > > > > > > Can you help me to provide the right clue? Do I have to push the tho > > streams in a sychronized fashion (such as simulating real time data flow, > > as they came the first time into the system)? > > > > Thanks for your support. > > > > Best > > Marco > >