Hi, Thanks for clarifying, so Kafka Streams Task attempts to synchronize timestamps of topics on best effort.
Then Kafka Clients API users have to maintain similar functionality in order to preserve such behaviour. Does it mean that, if we have two streams having same number of events, assuming there is no outside influence on business logic, applying same business logic from offset 0 at both streams, will always produce same result? Thanks On 27 Apr 2017 14:29, "Damian Guy" <damian....@gmail.com> wrote: > As i said in my previous email: > > Yes. When streams are joined each partition from the joined streams are > > grouped together into a single Task. Each Task maintains a record buffer > > for all of the topics it is consuming from. When it is time process a > > record it will chose a record from the partition that has the smallest > > timestamp. So in this way it makes a best effort to keep the streams in > > sync. > > So if you are joining two topics, they are both starting from offset 0 with > no local state, they will be consumed at roughly the same rate and kept in > sync based on the time extracted from the records. > > > On Thu, 27 Apr 2017 at 08:02 Murad Mamedov <m...@muradm.net> wrote: > > > Hi Devs, > > > > May be you can shed some light.. > > > > Thanks in advance > > ---------- Forwarded message ---------- > > From: "m...@muradm.net" <m...@muradm.net> > > Date: 26 Apr 2017 19:53 > > Subject: Re: Time synchronization between streams > > To: <us...@kafka.apache.org> > > Cc: > > > > Yes, basically I'm ok with how join works including window and retention > > > periods, under normal circumstances. In real time of occurrence of > > events, > > > application joining streams will get something like this: > > > > > > T1 + 0 => topic_small (K1, V1) => join result (None) > > > T1 + 1 min => topic_large (K1, VT1) => join result (K1, V1, VT1) > > > T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2) > > > T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2) > > > > > > According to Windowed<K> and WindowedSerializer it keeps only start of > > > window with key when storing it to state store. Assuming that window > > start > > > time same for both topics/KStreams (not sure yet, still reading > source), > > > but even if not same, state stores actions of Kafka Streams will be > like > > > this: > > > > > > join_left_side_store.put ( K1-W1, V1 ) > > > join_right_side_store.put ( K1-W1, VT1 ) > > > join_left_side_store.put ( K1-W1, V2 ) > > > join_right_side_store.put ( K1-W1, VT2 ) > > > > > > However when consuming same topics by the same application from > beginning > > > from scratch (no application local state stores) for large period of > time > > > (greater than window period, but less than retention period), join > result > > > for 10 minutes window will be different, like this: > > > > > > join result (None) > > > join result (K1, V2, VT1) > > > join result (K1, V2, VT2) > > > > > > Because topic_large's stream is being read slower, value of topic_small > > in > > > window will change from V1 to V2, before Kafka Streams will receive > VT1. > > > > > > I.e. state stores actions of Kafka Streams will be like this: > > > > > > join_left_side_store.put ( K1-W1, V1 ) > > > join_left_side_store.put ( K1-W1, V2 ) > > > join_right_side_store.put ( K1-W1, VT1 ) > > > join_right_side_store.put ( K1-W1, VT2 ) > > > > > > Isn't it? > > > > > > On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > > Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <m...@muradm.net> > > > wrote: > > > > > > Is there any global time synchronization between streams in Kafka > Streams > > > API? So that, it would not consume more events from one stream while > the > > > other is still behind in time. Or probably better to rephrase it like, > is > > > there global event ordering based on timestamp of event? > > > > > > Yes. When streams are joined each partition from the joined streams are > > > grouped together into a single Task. Each Task maintains a record > buffer > > > for all of the topics it is consuming from. When it is time process a > > > record it will chose a record from the partition that has the smallest > > > timestamp. So in this way it makes a best effort to keep the streams in > > > sync. > > > > > > The other thing could be to join streams in window, however same > question > > > arises, if one stream days behind the other, will the join window of 15 > > > minutes ever work? > > > > > > If the data is arriving much later you can use > > JoinWindows.until(SOME_TIME_PERIOD) > > > to keep the data around. In this case the streams will still join. Once > > > SOME_TIME_PERIOD has expired the streams will no longer be able to > join. > > > > > > I'm trying to grasp a way on how to design replay of long periods of > time > > > for application with multiple topics/streams. Especially when combining > > > with low-level API processors and transformers which relay on each > other > > > via GlobalKTable or KTable stores on these streams. For instance, > smaller > > > topic could have the following sequence of events: T1 - (k1, v1) T1 + > 10 > > > minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger > > > events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes - > (k1, > > > vt2) If one would join or lookup these streams in realtime (timestamp > of > > > event is approximately = wall clock time) result would be: T1 - > > topic_small > > > (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1) > - > > > topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) - > > > topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) - > > > topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) - > > topic_large > > > (k1, vt2) However, when replaying streams from beginning, from > > perspective > > > of topic with large events, it would see topic with small events as > (k1, > > > v2), completely missing v1 and null states in case of > GlobalKTable/KTable > > > presentation or events in case of KStream-KStream windowed join. > > > > > > I don't really follow here. In the case of a GlobalKTable it will be > > > initialized with all of the existing data before the rest of the > streams > > > start processing. > > > > > > Do I miss something here? Should application be responsible in global > > > synchronization between topics, or Kafka Streams does / can do that? If > > > application should, then what could be approach to solve it? I hope I > > could > > > explain myself. Thanks in advance > > > > > > > > >