Hi Marco,

Could you share a bit of your code, or at a minimum provide some info on:
- is userActivitiesStream and geoDataStream a KStream of KTable?
- what is the length of "timewindow"?

Thanks
Eno

> On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitab...@gmail.com> wrote:
> 
> Hi All!
> 
> I need a little hint to understand how join works, in regards of stream
> synchronization.
> 
> This mail is a bit long, I need to explain the issue I'm facing.
> 
> *TL-TR: *
> it seems that join synchonization between stream is not respected as
> explained here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
> 
> *The need:*
> I have historical data residing into some databases, more specifically:
>  - time series of user activities
>  - time series of user geo positions
> 
> *What I do:*
> since I have a new algorithm I want to try, the historical data has been
> already pruned by kafka retention policy and I have it into a database.
> This is what I'm doing:
>  1- spin up kafka-connect sink that takes historical gps data (let's say,
> one day of data), ordered by event time, and push them into
> "HistoricalGpsData" topic. This tasks pushes historical geo data as fast as
> possible into kafka topic, respecting the original event time.
>  2- spin up kafka-connect sink that takes historical user activities
> (let's say, one day of data, the same day of gps data, of course), ordered
> by event time, and push them into "HistoricalUserActivites" topic. This
> tasks pushes historical user activities data as fast as possible into kafka
> topic, respecting the original event time.
>  3- spin up my new stream processor algorithm
> 
> As per the nature of the data, I have the quantity of activity data much
> higher than geo data, thus the task1 pushes all the needed geo data into
> kafka topic within few minutes (around 10 minutes), while activities data,
> since has a higher volume, is entirely pushed within 1 hour.
> --> the two streams are pushed into kafka regardless of their
> synchronization (however being aware of their nature, as explained above)
> 
> *What I expect:*
> Now, what I would expect is that when I perform the join between the two
> stream:
> 
>   userActivitiesStream.join(geoDataStrea, timewindow...)
> 
> the join takes the incoming user activities data and joins with the geo
> data respecting the given time window.
> As per the nature of the data, there is always a match (within the given
> timeWindow) between user activities data with geo data (in fact, when this
> data arrives in real time, there are no issues at all)
> 
> So. I expect that the join picks up from the topic the right geo data
> (recall that geo data is pushed into the topic within 10 minutes) and joins
> it with the user activities data (recall that user activities data is a
> stream that takes around 1 hour)
> 
> *What I get:*
> What happens is that only the first few minutes of user data is actually
> processed by the join, after that user data comes is and the joins doesn't
> join any data anymore.
> 
> It seems that the join doesn't respect the time semantics (configured to be
> the default straregy: event data) unless the two streams are synchronized
> (actually, this happens the first minutes, when I start the whole
> reprocessing tasks).
> 
> 
> Can you help me to provide the right clue? Do I have to push the tho
> streams in a sychronized fashion (such as simulating real time data flow,
> as they came the first time into the system)?
> 
> Thanks for your support.
> 
> Best
> Marco

Reply via email to