As i said in my previous email:
> Yes. When streams are joined each partition from the joined streams are
> grouped together into a single Task. Each Task maintains a record buffer
> for all of the topics it is consuming from. When it is time process a
> record it will chose a record from the partition that has the smallest
> timestamp. So in this way it makes a best effort to keep the streams in
> sync.

So if you are joining two topics, they are both starting from offset 0 with
no local state, they will be consumed at roughly the same rate and kept in
sync based on the time extracted from the records.


On Thu, 27 Apr 2017 at 08:02 Murad Mamedov <m...@muradm.net> wrote:

> Hi Devs,
>
> May be you can shed some light..
>
> Thanks in advance
> ---------- Forwarded message ----------
> From: "m...@muradm.net" <m...@muradm.net>
> Date: 26 Apr 2017 19:53
> Subject: Re: Time synchronization between streams
> To: <us...@kafka.apache.org>
> Cc:
>
> Yes, basically I'm ok with how join works including window and retention
> > periods, under normal circumstances. In real time of occurrence of
> events,
> > application joining streams will get something like this:
> >
> > T1 + 0 => topic_small (K1, V1)  => join result (None)
> > T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
> > T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
> > T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)
> >
> > According to Windowed<K> and WindowedSerializer it keeps only start of
> > window with key when storing it to state store. Assuming that window
> start
> > time same for both topics/KStreams (not sure yet, still reading source),
> > but even if not same, state stores actions of Kafka Streams will be like
> > this:
> >
> > join_left_side_store.put ( K1-W1, V1 )
> > join_right_side_store.put ( K1-W1, VT1 )
> > join_left_side_store.put ( K1-W1, V2 )
> > join_right_side_store.put ( K1-W1, VT2 )
> >
> > However when consuming same topics by the same application from beginning
> > from scratch (no application local state stores) for large period of time
> > (greater than window period, but less than retention period), join result
> > for 10 minutes window will be different, like this:
> >
> > join result (None)
> > join result (K1, V2, VT1)
> > join result (K1, V2, VT2)
> >
> > Because topic_large's stream is being read slower, value of topic_small
> in
> > window will change from V1 to V2, before Kafka Streams will receive VT1.
> >
> > I.e. state stores actions of Kafka Streams will be like this:
> >
> > join_left_side_store.put ( K1-W1, V1 )
> > join_left_side_store.put ( K1-W1, V2 )
> > join_right_side_store.put ( K1-W1, VT1 )
> > join_right_side_store.put ( K1-W1, VT2 )
> >
> > Isn't it?
> >
> > On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> > Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <m...@muradm.net>
> > wrote:
> >
> > Is there any global time synchronization between streams in Kafka Streams
> > API? So that, it would not consume more events from one stream while the
> > other is still behind in time. Or probably better to rephrase it like, is
> > there global event ordering based on timestamp of event?
> >
> > Yes. When streams are joined each partition from the joined streams are
> > grouped together into a single Task. Each Task maintains a record buffer
> > for all of the topics it is consuming from. When it is time process a
> > record it will chose a record from the partition that has the smallest
> > timestamp. So in this way it makes a best effort to keep the streams in
> > sync.
> >
> > The other thing could be to join streams in window, however same question
> > arises, if one stream days behind the other, will the join window of 15
> > minutes ever work?
> >
> > If the data is arriving much later you can use
> JoinWindows.until(SOME_TIME_PERIOD)
> > to keep the data around. In this case the streams will still join. Once
> > SOME_TIME_PERIOD has expired the streams will no longer be able to join.
> >
> > I'm trying to grasp a way on how to design replay of long periods of time
> > for application with multiple topics/streams. Especially when combining
> > with low-level API processors and transformers which relay on each other
> > via GlobalKTable or KTable stores on these streams. For instance, smaller
> > topic could have the following sequence of events: T1 - (k1, v1) T1 + 10
> > minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger
> > events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes - (k1,
> > vt2) If one would join or lookup these streams in realtime (timestamp of
> > event is approximately = wall clock time) result would be: T1 -
> topic_small
> > (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1) -
> > topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) -
> > topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) -
> > topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) -
> topic_large
> > (k1, vt2) However, when replaying streams from beginning, from
> perspective
> > of topic with large events, it would see topic with small events as (k1,
> > v2), completely missing v1 and null states in case of GlobalKTable/KTable
> > presentation or events in case of KStream-KStream windowed join.
> >
> > I don't really follow here. In the case of a GlobalKTable it will be
> > initialized with all of the existing data before the rest of the streams
> > start processing.
> >
> > Do I miss something here? Should application be responsible in global
> > synchronization between topics, or Kafka Streams does / can do that? If
> > application should, then what could be approach to solve it? I hope I
> could
> > explain myself. Thanks in advance
> >
> >
>

Reply via email to