Hi Devs, May be you can shed some light..
Thanks in advance ---------- Forwarded message ---------- From: "m...@muradm.net" <m...@muradm.net> Date: 26 Apr 2017 19:53 Subject: Re: Time synchronization between streams To: <us...@kafka.apache.org> Cc: Yes, basically I'm ok with how join works including window and retention > periods, under normal circumstances. In real time of occurrence of events, > application joining streams will get something like this: > > T1 + 0 => topic_small (K1, V1) => join result (None) > T1 + 1 min => topic_large (K1, VT1) => join result (K1, V1, VT1) > T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2) > T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2) > > According to Windowed<K> and WindowedSerializer it keeps only start of > window with key when storing it to state store. Assuming that window start > time same for both topics/KStreams (not sure yet, still reading source), > but even if not same, state stores actions of Kafka Streams will be like > this: > > join_left_side_store.put ( K1-W1, V1 ) > join_right_side_store.put ( K1-W1, VT1 ) > join_left_side_store.put ( K1-W1, V2 ) > join_right_side_store.put ( K1-W1, VT2 ) > > However when consuming same topics by the same application from beginning > from scratch (no application local state stores) for large period of time > (greater than window period, but less than retention period), join result > for 10 minutes window will be different, like this: > > join result (None) > join result (K1, V2, VT1) > join result (K1, V2, VT2) > > Because topic_large's stream is being read slower, value of topic_small in > window will change from V1 to V2, before Kafka Streams will receive VT1. > > I.e. state stores actions of Kafka Streams will be like this: > > join_left_side_store.put ( K1-W1, V1 ) > join_left_side_store.put ( K1-W1, V2 ) > join_right_side_store.put ( K1-W1, VT1 ) > join_right_side_store.put ( K1-W1, VT2 ) > > Isn't it? > > On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <damian....@gmail.com> wrote: > > Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <m...@muradm.net> > wrote: > > Is there any global time synchronization between streams in Kafka Streams > API? So that, it would not consume more events from one stream while the > other is still behind in time. Or probably better to rephrase it like, is > there global event ordering based on timestamp of event? > > Yes. When streams are joined each partition from the joined streams are > grouped together into a single Task. Each Task maintains a record buffer > for all of the topics it is consuming from. When it is time process a > record it will chose a record from the partition that has the smallest > timestamp. So in this way it makes a best effort to keep the streams in > sync. > > The other thing could be to join streams in window, however same question > arises, if one stream days behind the other, will the join window of 15 > minutes ever work? > > If the data is arriving much later you can use > JoinWindows.until(SOME_TIME_PERIOD) > to keep the data around. In this case the streams will still join. Once > SOME_TIME_PERIOD has expired the streams will no longer be able to join. > > I'm trying to grasp a way on how to design replay of long periods of time > for application with multiple topics/streams. Especially when combining > with low-level API processors and transformers which relay on each other > via GlobalKTable or KTable stores on these streams. For instance, smaller > topic could have the following sequence of events: T1 - (k1, v1) T1 + 10 > minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger > events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes - (k1, > vt2) If one would join or lookup these streams in realtime (timestamp of > event is approximately = wall clock time) result would be: T1 - topic_small > (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1) - > topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) - > topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) - > topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) - topic_large > (k1, vt2) However, when replaying streams from beginning, from perspective > of topic with large events, it would see topic with small events as (k1, > v2), completely missing v1 and null states in case of GlobalKTable/KTable > presentation or events in case of KStream-KStream windowed join. > > I don't really follow here. In the case of a GlobalKTable it will be > initialized with all of the existing data before the rest of the streams > start processing. > > Do I miss something here? Should application be responsible in global > synchronization between topics, or Kafka Streams does / can do that? If > application should, then what could be approach to solve it? I hope I could > explain myself. Thanks in advance > >