Hi Marco, Could you share a bit of your code, or at a minimum provide some info on: - is userActivitiesStream and geoDataStream a KStream of KTable? - what is the length of "timewindow"?
Thanks Eno > On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitab...@gmail.com> wrote: > > Hi All! > > I need a little hint to understand how join works, in regards of stream > synchronization. > > This mail is a bit long, I need to explain the issue I'm facing. > > *TL-TR: * > it seems that join synchonization between stream is not respected as > explained here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization > > *The need:* > I have historical data residing into some databases, more specifically: > - time series of user activities > - time series of user geo positions > > *What I do:* > since I have a new algorithm I want to try, the historical data has been > already pruned by kafka retention policy and I have it into a database. > This is what I'm doing: > 1- spin up kafka-connect sink that takes historical gps data (let's say, > one day of data), ordered by event time, and push them into > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast as > possible into kafka topic, respecting the original event time. > 2- spin up kafka-connect sink that takes historical user activities > (let's say, one day of data, the same day of gps data, of course), ordered > by event time, and push them into "HistoricalUserActivites" topic. This > tasks pushes historical user activities data as fast as possible into kafka > topic, respecting the original event time. > 3- spin up my new stream processor algorithm > > As per the nature of the data, I have the quantity of activity data much > higher than geo data, thus the task1 pushes all the needed geo data into > kafka topic within few minutes (around 10 minutes), while activities data, > since has a higher volume, is entirely pushed within 1 hour. > --> the two streams are pushed into kafka regardless of their > synchronization (however being aware of their nature, as explained above) > > *What I expect:* > Now, what I would expect is that when I perform the join between the two > stream: > > userActivitiesStream.join(geoDataStrea, timewindow...) > > the join takes the incoming user activities data and joins with the geo > data respecting the given time window. > As per the nature of the data, there is always a match (within the given > timeWindow) between user activities data with geo data (in fact, when this > data arrives in real time, there are no issues at all) > > So. I expect that the join picks up from the topic the right geo data > (recall that geo data is pushed into the topic within 10 minutes) and joins > it with the user activities data (recall that user activities data is a > stream that takes around 1 hour) > > *What I get:* > What happens is that only the first few minutes of user data is actually > processed by the join, after that user data comes is and the joins doesn't > join any data anymore. > > It seems that the join doesn't respect the time semantics (configured to be > the default straregy: event data) unless the two streams are synchronized > (actually, this happens the first minutes, when I start the whole > reprocessing tasks). > > > Can you help me to provide the right clue? Do I have to push the tho > streams in a sychronized fashion (such as simulating real time data flow, > as they came the first time into the system)? > > Thanks for your support. > > Best > Marco